HDFS-5318. Merging r1569951 from trunk to branch-2.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1569959 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Arpit Agarwal 2014-02-19 23:08:29 +00:00
parent 287536ef9d
commit df72553cba
13 changed files with 415 additions and 42 deletions

View File

@ -765,6 +765,19 @@ public class Configuration implements Iterable<Map.Entry<String,String>>,
addResourceObject(new Resource(in, name)); addResourceObject(new Resource(in, name));
} }
/**
* Add a configuration resource.
*
* The properties of this resource will override properties of previously
* added resources, unless they were marked <a href="#Final">final</a>.
*
* @param conf Configuration object from which to load properties
*/
public void addResource(Configuration conf) {
addResourceObject(new Resource(conf.getProps()));
}
/** /**
* Reload configuration from previously added resources. * Reload configuration from previously added resources.

View File

@ -89,6 +89,9 @@ Release 2.4.0 - UNRELEASED
HDFS-5973. add DomainSocket#shutdown method (cmccabe) HDFS-5973. add DomainSocket#shutdown method (cmccabe)
HDFS-5318. Support read-only and read-write paths to shared replicas.
(Eric Sirianni via Arpit Agarwal)
OPTIMIZATIONS OPTIMIZATIONS
HDFS-5790. LeaseManager.findPath is very slow when many leases need recovery HDFS-5790. LeaseManager.findPath is very slow when many leases need recovery

View File

@ -1497,8 +1497,8 @@ public class PBHelper {
private static StorageState convertState(State state) { private static StorageState convertState(State state) {
switch(state) { switch(state) {
case READ_ONLY: case READ_ONLY_SHARED:
return StorageState.READ_ONLY; return StorageState.READ_ONLY_SHARED;
case NORMAL: case NORMAL:
default: default:
return StorageState.NORMAL; return StorageState.NORMAL;
@ -1526,8 +1526,8 @@ public class PBHelper {
private static State convertState(StorageState state) { private static State convertState(StorageState state) {
switch(state) { switch(state) {
case READ_ONLY: case READ_ONLY_SHARED:
return DatanodeStorage.State.READ_ONLY; return DatanodeStorage.State.READ_ONLY_SHARED;
case NORMAL: case NORMAL:
default: default:
return DatanodeStorage.State.NORMAL; return DatanodeStorage.State.NORMAL;

View File

@ -73,6 +73,7 @@ import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations; import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage.State;
import org.apache.hadoop.hdfs.server.protocol.KeyUpdateCommand; import org.apache.hadoop.hdfs.server.protocol.KeyUpdateCommand;
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo; import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks; import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
@ -482,7 +483,10 @@ public class BlockManager {
chooseSourceDatanode(block, containingNodes, chooseSourceDatanode(block, containingNodes,
containingLiveReplicasNodes, numReplicas, containingLiveReplicasNodes, numReplicas,
UnderReplicatedBlocks.LEVEL); UnderReplicatedBlocks.LEVEL);
assert containingLiveReplicasNodes.size() == numReplicas.liveReplicas();
// containingLiveReplicasNodes can include READ_ONLY_SHARED replicas which are
// not included in the numReplicas.liveReplicas() count
assert containingLiveReplicasNodes.size() >= numReplicas.liveReplicas();
int usableReplicas = numReplicas.liveReplicas() + int usableReplicas = numReplicas.liveReplicas() +
numReplicas.decommissionedReplicas(); numReplicas.decommissionedReplicas();
@ -1021,7 +1025,7 @@ public class BlockManager {
*/ */
private void addToInvalidates(Block b) { private void addToInvalidates(Block b) {
StringBuilder datanodes = new StringBuilder(); StringBuilder datanodes = new StringBuilder();
for(DatanodeStorageInfo storage : blocksMap.getStorages(b)) { for(DatanodeStorageInfo storage : blocksMap.getStorages(b, State.NORMAL)) {
final DatanodeDescriptor node = storage.getDatanodeDescriptor(); final DatanodeDescriptor node = storage.getDatanodeDescriptor();
invalidateBlocks.add(b, node, false); invalidateBlocks.add(b, node, false);
datanodes.append(node).append(" "); datanodes.append(node).append(" ");
@ -1234,7 +1238,10 @@ public class BlockManager {
continue; continue;
} }
assert liveReplicaNodes.size() == numReplicas.liveReplicas(); // liveReplicaNodes can include READ_ONLY_SHARED replicas which are
// not included in the numReplicas.liveReplicas() count
assert liveReplicaNodes.size() >= numReplicas.liveReplicas();
// do not schedule more if enough replicas is already pending // do not schedule more if enough replicas is already pending
numEffectiveReplicas = numReplicas.liveReplicas() + numEffectiveReplicas = numReplicas.liveReplicas() +
pendingReplications.getNumReplicas(block); pendingReplications.getNumReplicas(block);
@ -1478,15 +1485,16 @@ public class BlockManager {
final DatanodeDescriptor node = storage.getDatanodeDescriptor(); final DatanodeDescriptor node = storage.getDatanodeDescriptor();
LightWeightLinkedSet<Block> excessBlocks = LightWeightLinkedSet<Block> excessBlocks =
excessReplicateMap.get(node.getDatanodeUuid()); excessReplicateMap.get(node.getDatanodeUuid());
int countableReplica = storage.getState() == State.NORMAL ? 1 : 0;
if ((nodesCorrupt != null) && (nodesCorrupt.contains(node))) if ((nodesCorrupt != null) && (nodesCorrupt.contains(node)))
corrupt++; corrupt += countableReplica;
else if (node.isDecommissionInProgress() || node.isDecommissioned()) else if (node.isDecommissionInProgress() || node.isDecommissioned())
decommissioned++; decommissioned += countableReplica;
else if (excessBlocks != null && excessBlocks.contains(block)) { else if (excessBlocks != null && excessBlocks.contains(block)) {
excess++; excess += countableReplica;
} else { } else {
nodesContainingLiveReplicas.add(storage); nodesContainingLiveReplicas.add(storage);
live++; live += countableReplica;
} }
containingNodes.add(node); containingNodes.add(node);
// Check if this replica is corrupt // Check if this replica is corrupt
@ -2483,7 +2491,7 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
Collection<DatanodeDescriptor> nonExcess = new ArrayList<DatanodeDescriptor>(); Collection<DatanodeDescriptor> nonExcess = new ArrayList<DatanodeDescriptor>();
Collection<DatanodeDescriptor> corruptNodes = corruptReplicas Collection<DatanodeDescriptor> corruptNodes = corruptReplicas
.getNodes(block); .getNodes(block);
for(DatanodeStorageInfo storage : blocksMap.getStorages(block)) { for(DatanodeStorageInfo storage : blocksMap.getStorages(block, State.NORMAL)) {
final DatanodeDescriptor cur = storage.getDatanodeDescriptor(); final DatanodeDescriptor cur = storage.getDatanodeDescriptor();
if (storage.areBlockContentsStale()) { if (storage.areBlockContentsStale()) {
LOG.info("BLOCK* processOverReplicatedBlock: " + LOG.info("BLOCK* processOverReplicatedBlock: " +
@ -2812,7 +2820,7 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
int excess = 0; int excess = 0;
int stale = 0; int stale = 0;
Collection<DatanodeDescriptor> nodesCorrupt = corruptReplicas.getNodes(b); Collection<DatanodeDescriptor> nodesCorrupt = corruptReplicas.getNodes(b);
for(DatanodeStorageInfo storage : blocksMap.getStorages(b)) { for(DatanodeStorageInfo storage : blocksMap.getStorages(b, State.NORMAL)) {
final DatanodeDescriptor node = storage.getDatanodeDescriptor(); final DatanodeDescriptor node = storage.getDatanodeDescriptor();
if ((nodesCorrupt != null) && (nodesCorrupt.contains(node))) { if ((nodesCorrupt != null) && (nodesCorrupt.contains(node))) {
corrupt++; corrupt++;
@ -2851,7 +2859,7 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
// else proceed with fast case // else proceed with fast case
int live = 0; int live = 0;
Collection<DatanodeDescriptor> nodesCorrupt = corruptReplicas.getNodes(b); Collection<DatanodeDescriptor> nodesCorrupt = corruptReplicas.getNodes(b);
for(DatanodeStorageInfo storage : blocksMap.getStorages(b)) { for(DatanodeStorageInfo storage : blocksMap.getStorages(b, State.NORMAL)) {
final DatanodeDescriptor node = storage.getDatanodeDescriptor(); final DatanodeDescriptor node = storage.getDatanodeDescriptor();
if ((nodesCorrupt == null) || (!nodesCorrupt.contains(node))) if ((nodesCorrupt == null) || (!nodesCorrupt.contains(node)))
live++; live++;

View File

@ -605,7 +605,7 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
+ storageType); + storageType);
return false; return false;
} }
if (storage.getState() == State.READ_ONLY) { if (storage.getState() == State.READ_ONLY_SHARED) {
logNodeIsNotChosen(storage, "storage is read-only"); logNodeIsNotChosen(storage, "storage is read-only");
return false; return false;
} }

View File

@ -20,9 +20,13 @@ package org.apache.hadoop.hdfs.server.blockmanagement;
import java.util.Iterator; import java.util.Iterator;
import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.util.GSet; import org.apache.hadoop.util.GSet;
import org.apache.hadoop.util.LightWeightGSet; import org.apache.hadoop.util.LightWeightGSet;
import com.google.common.base.Predicate;
import com.google.common.collect.Iterables;
/** /**
* This class maintains the map from a block to its metadata. * This class maintains the map from a block to its metadata.
* block's metadata currently includes blockCollection it belongs to and * block's metadata currently includes blockCollection it belongs to and
@ -121,6 +125,22 @@ class BlocksMap {
return getStorages(blocks.get(b)); return getStorages(blocks.get(b));
} }
/**
* Searches for the block in the BlocksMap and
* returns {@link Iterable} of the storages the block belongs to
* <i>that are of the given {@link DatanodeStorage.State state}</i>.
*
* @param state DatanodeStorage state by which to filter the returned Iterable
*/
Iterable<DatanodeStorageInfo> getStorages(Block b, final DatanodeStorage.State state) {
return Iterables.filter(getStorages(blocks.get(b)), new Predicate<DatanodeStorageInfo>() {
@Override
public boolean apply(DatanodeStorageInfo storage) {
return storage.getState() == state;
}
});
}
/** /**
* For a block that has already been retrieved from the BlocksMap * For a block that has already been retrieved from the BlocksMap
* returns {@link Iterable} of the storages the block belongs to. * returns {@link Iterable} of the storages the block belongs to.

View File

@ -58,6 +58,8 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus; import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy; import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementStatus; import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementStatus;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.NumberReplicas;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.datanode.CachingStrategy; import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetUtils;
@ -378,11 +380,13 @@ public class NamenodeFsck {
boolean isCorrupt = lBlk.isCorrupt(); boolean isCorrupt = lBlk.isCorrupt();
String blkName = block.toString(); String blkName = block.toString();
DatanodeInfo[] locs = lBlk.getLocations(); DatanodeInfo[] locs = lBlk.getLocations();
res.totalReplicas += locs.length; NumberReplicas numberReplicas = namenode.getNamesystem().getBlockManager().countNodes(block.getLocalBlock());
int liveReplicas = numberReplicas.liveReplicas();
res.totalReplicas += liveReplicas;
short targetFileReplication = file.getReplication(); short targetFileReplication = file.getReplication();
res.numExpectedReplicas += targetFileReplication; res.numExpectedReplicas += targetFileReplication;
if (locs.length > targetFileReplication) { if (liveReplicas > targetFileReplication) {
res.excessiveReplicas += (locs.length - targetFileReplication); res.excessiveReplicas += (liveReplicas - targetFileReplication);
res.numOverReplicatedBlocks += 1; res.numOverReplicatedBlocks += 1;
} }
// Check if block is Corrupt // Check if block is Corrupt
@ -392,10 +396,10 @@ public class NamenodeFsck {
out.print("\n" + path + ": CORRUPT blockpool " + block.getBlockPoolId() + out.print("\n" + path + ": CORRUPT blockpool " + block.getBlockPoolId() +
" block " + block.getBlockName()+"\n"); " block " + block.getBlockName()+"\n");
} }
if (locs.length >= minReplication) if (liveReplicas >= minReplication)
res.numMinReplicatedBlocks++; res.numMinReplicatedBlocks++;
if (locs.length < targetFileReplication && locs.length > 0) { if (liveReplicas < targetFileReplication && liveReplicas > 0) {
res.missingReplicas += (targetFileReplication - locs.length); res.missingReplicas += (targetFileReplication - liveReplicas);
res.numUnderReplicatedBlocks += 1; res.numUnderReplicatedBlocks += 1;
underReplicatedPerFile++; underReplicatedPerFile++;
if (!showFiles) { if (!showFiles) {
@ -404,7 +408,7 @@ public class NamenodeFsck {
out.println(" Under replicated " + block + out.println(" Under replicated " + block +
". Target Replicas is " + ". Target Replicas is " +
targetFileReplication + " but found " + targetFileReplication + " but found " +
locs.length + " replica(s)."); liveReplicas + " replica(s).");
} }
// verify block placement policy // verify block placement policy
BlockPlacementStatus blockPlacementStatus = bpPolicy BlockPlacementStatus blockPlacementStatus = bpPolicy
@ -421,13 +425,13 @@ public class NamenodeFsck {
block + ". " + blockPlacementStatus.getErrorDescription()); block + ". " + blockPlacementStatus.getErrorDescription());
} }
report.append(i + ". " + blkName + " len=" + block.getNumBytes()); report.append(i + ". " + blkName + " len=" + block.getNumBytes());
if (locs.length == 0) { if (liveReplicas == 0) {
report.append(" MISSING!"); report.append(" MISSING!");
res.addMissing(block.toString(), block.getNumBytes()); res.addMissing(block.toString(), block.getNumBytes());
missing++; missing++;
missize += block.getNumBytes(); missize += block.getNumBytes();
} else { } else {
report.append(" repl=" + locs.length); report.append(" repl=" + liveReplicas);
if (showLocations || showRacks) { if (showLocations || showRacks) {
StringBuilder sb = new StringBuilder("["); StringBuilder sb = new StringBuilder("[");
for (int j = 0; j < locs.length; j++) { for (int j = 0; j < locs.length; j++) {

View File

@ -28,7 +28,18 @@ public class DatanodeStorage {
/** The state of the storage. */ /** The state of the storage. */
public enum State { public enum State {
NORMAL, NORMAL,
READ_ONLY
/**
* A storage that represents a read-only path to replicas stored on a shared storage device.
* Replicas on {@link #READ_ONLY_SHARED} storage are not counted towards live replicas.
*
* <p>
* In certain implementations, a {@link #READ_ONLY_SHARED} storage may be correlated to
* its {@link #NORMAL} counterpart using the {@link DatanodeStorage#storageID}. This
* property should be used for debugging purposes only.
* </p>
*/
READ_ONLY_SHARED;
} }
private final String storageID; private final String storageID;

View File

@ -49,7 +49,7 @@ message DatanodeRegistrationProto {
message DatanodeStorageProto { message DatanodeStorageProto {
enum StorageState { enum StorageState {
NORMAL = 0; NORMAL = 0;
READ_ONLY = 1; READ_ONLY_SHARED = 1;
} }
required string storageUuid = 1; required string storageUuid = 1;

View File

@ -157,6 +157,7 @@ public class MiniDFSCluster {
private boolean checkExitOnShutdown = true; private boolean checkExitOnShutdown = true;
private boolean checkDataNodeAddrConfig = false; private boolean checkDataNodeAddrConfig = false;
private boolean checkDataNodeHostConfig = false; private boolean checkDataNodeHostConfig = false;
private Configuration[] dnConfOverlays;
public Builder(Configuration conf) { public Builder(Configuration conf) {
this.conf = conf; this.conf = conf;
@ -325,6 +326,19 @@ public class MiniDFSCluster {
return this; return this;
} }
/**
* Default: null
*
* An array of {@link Configuration} objects that will overlay the
* global MiniDFSCluster Configuration for the corresponding DataNode.
*
* Useful for setting specific per-DataNode configuration parameters.
*/
public Builder dataNodeConfOverlays(Configuration[] dnConfOverlays) {
this.dnConfOverlays = dnConfOverlays;
return this;
}
/** /**
* Construct the actual MiniDFSCluster * Construct the actual MiniDFSCluster
*/ */
@ -366,7 +380,8 @@ public class MiniDFSCluster {
builder.nnTopology, builder.nnTopology,
builder.checkExitOnShutdown, builder.checkExitOnShutdown,
builder.checkDataNodeAddrConfig, builder.checkDataNodeAddrConfig,
builder.checkDataNodeHostConfig); builder.checkDataNodeHostConfig,
builder.dnConfOverlays);
} }
public class DataNodeProperties { public class DataNodeProperties {
@ -604,7 +619,7 @@ public class MiniDFSCluster {
manageNameDfsDirs, true, true, manageDataDfsDirs, manageNameDfsDirs, true, true, manageDataDfsDirs,
operation, racks, hosts, operation, racks, hosts,
simulatedCapacities, null, true, false, simulatedCapacities, null, true, false,
MiniDFSNNTopology.simpleSingleNN(nameNodePort, 0), true, false, false); MiniDFSNNTopology.simpleSingleNN(nameNodePort, 0), true, false, false, null);
} }
private void initMiniDFSCluster( private void initMiniDFSCluster(
@ -616,7 +631,8 @@ public class MiniDFSCluster {
boolean waitSafeMode, boolean setupHostsFile, boolean waitSafeMode, boolean setupHostsFile,
MiniDFSNNTopology nnTopology, boolean checkExitOnShutdown, MiniDFSNNTopology nnTopology, boolean checkExitOnShutdown,
boolean checkDataNodeAddrConfig, boolean checkDataNodeAddrConfig,
boolean checkDataNodeHostConfig) boolean checkDataNodeHostConfig,
Configuration[] dnConfOverlays)
throws IOException { throws IOException {
ExitUtil.disableSystemExit(); ExitUtil.disableSystemExit();
@ -681,8 +697,9 @@ public class MiniDFSCluster {
} }
// Start the DataNodes // Start the DataNodes
startDataNodes(conf, numDataNodes, storageType, manageDataDfsDirs, operation, racks, startDataNodes(conf, numDataNodes, storageType, manageDataDfsDirs,
hosts, simulatedCapacities, setupHostsFile, checkDataNodeAddrConfig, checkDataNodeHostConfig); operation, racks, hosts, simulatedCapacities, setupHostsFile,
checkDataNodeAddrConfig, checkDataNodeHostConfig, dnConfOverlays);
waitClusterUp(); waitClusterUp();
//make sure ProxyUsers uses the latest conf //make sure ProxyUsers uses the latest conf
ProxyUsers.refreshSuperUserGroupsConfiguration(conf); ProxyUsers.refreshSuperUserGroupsConfiguration(conf);
@ -1077,7 +1094,7 @@ public class MiniDFSCluster {
long[] simulatedCapacities, long[] simulatedCapacities,
boolean setupHostsFile) throws IOException { boolean setupHostsFile) throws IOException {
startDataNodes(conf, numDataNodes, StorageType.DEFAULT, manageDfsDirs, operation, racks, hosts, startDataNodes(conf, numDataNodes, StorageType.DEFAULT, manageDfsDirs, operation, racks, hosts,
simulatedCapacities, setupHostsFile, false, false); simulatedCapacities, setupHostsFile, false, false, null);
} }
/** /**
@ -1091,7 +1108,7 @@ public class MiniDFSCluster {
boolean setupHostsFile, boolean setupHostsFile,
boolean checkDataNodeAddrConfig) throws IOException { boolean checkDataNodeAddrConfig) throws IOException {
startDataNodes(conf, numDataNodes, StorageType.DEFAULT, manageDfsDirs, operation, racks, hosts, startDataNodes(conf, numDataNodes, StorageType.DEFAULT, manageDfsDirs, operation, racks, hosts,
simulatedCapacities, setupHostsFile, checkDataNodeAddrConfig, false); simulatedCapacities, setupHostsFile, checkDataNodeAddrConfig, false, null);
} }
/** /**
@ -1118,7 +1135,8 @@ public class MiniDFSCluster {
* @param setupHostsFile add new nodes to dfs hosts files * @param setupHostsFile add new nodes to dfs hosts files
* @param checkDataNodeAddrConfig if true, only set DataNode port addresses if not already set in config * @param checkDataNodeAddrConfig if true, only set DataNode port addresses if not already set in config
* @param checkDataNodeHostConfig if true, only set DataNode hostname key if not already set in config * @param checkDataNodeHostConfig if true, only set DataNode hostname key if not already set in config
* * @param dnConfOverlays An array of {@link Configuration} objects that will overlay the
* global MiniDFSCluster Configuration for the corresponding DataNode.
* @throws IllegalStateException if NameNode has been shutdown * @throws IllegalStateException if NameNode has been shutdown
*/ */
public synchronized void startDataNodes(Configuration conf, int numDataNodes, public synchronized void startDataNodes(Configuration conf, int numDataNodes,
@ -1127,7 +1145,8 @@ public class MiniDFSCluster {
long[] simulatedCapacities, long[] simulatedCapacities,
boolean setupHostsFile, boolean setupHostsFile,
boolean checkDataNodeAddrConfig, boolean checkDataNodeAddrConfig,
boolean checkDataNodeHostConfig) throws IOException { boolean checkDataNodeHostConfig,
Configuration[] dnConfOverlays) throws IOException {
if (operation == StartupOption.RECOVER) { if (operation == StartupOption.RECOVER) {
return; return;
} }
@ -1168,6 +1187,13 @@ public class MiniDFSCluster {
+ "] is less than the number of datanodes [" + numDataNodes + "]."); + "] is less than the number of datanodes [" + numDataNodes + "].");
} }
if (dnConfOverlays != null
&& numDataNodes > dnConfOverlays.length) {
throw new IllegalArgumentException( "The length of dnConfOverlays ["
+ dnConfOverlays.length
+ "] is less than the number of datanodes [" + numDataNodes + "].");
}
String [] dnArgs = (operation == null || String [] dnArgs = (operation == null ||
operation != StartupOption.ROLLBACK) ? operation != StartupOption.ROLLBACK) ?
null : new String[] {operation.getName()}; null : new String[] {operation.getName()};
@ -1175,6 +1201,9 @@ public class MiniDFSCluster {
for (int i = curDatanodesNum; i < curDatanodesNum+numDataNodes; i++) { for (int i = curDatanodesNum; i < curDatanodesNum+numDataNodes; i++) {
Configuration dnConf = new HdfsConfiguration(conf); Configuration dnConf = new HdfsConfiguration(conf);
if (dnConfOverlays != null) {
dnConf.addResource(dnConfOverlays[i]);
}
// Set up datanode address // Set up datanode address
setupDatanodeAddress(dnConf, setupHostsFile, checkDataNodeAddrConfig); setupDatanodeAddress(dnConf, setupHostsFile, checkDataNodeAddrConfig);
if (manageDfsDirs) { if (manageDfsDirs) {

View File

@ -210,7 +210,8 @@ public class MiniDFSClusterWithNodeGroup extends MiniDFSCluster {
long[] simulatedCapacities, long[] simulatedCapacities,
boolean setupHostsFile, boolean setupHostsFile,
boolean checkDataNodeAddrConfig, boolean checkDataNodeAddrConfig,
boolean checkDataNodeHostConfig) throws IOException { boolean checkDataNodeHostConfig,
Configuration[] dnConfOverlays) throws IOException {
startDataNodes(conf, numDataNodes, storageType, manageDfsDirs, operation, racks, startDataNodes(conf, numDataNodes, storageType, manageDfsDirs, operation, racks,
NODE_GROUPS, hosts, simulatedCapacities, setupHostsFile, NODE_GROUPS, hosts, simulatedCapacities, setupHostsFile,
checkDataNodeAddrConfig, checkDataNodeHostConfig); checkDataNodeAddrConfig, checkDataNodeHostConfig);

View File

@ -34,6 +34,7 @@ import javax.management.StandardMBean;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.StorageType;
import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo; import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
@ -96,6 +97,11 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
public static final long DEFAULT_CAPACITY = 2L<<40; // 1 terabyte public static final long DEFAULT_CAPACITY = 2L<<40; // 1 terabyte
public static final byte DEFAULT_DATABYTE = 9; public static final byte DEFAULT_DATABYTE = 9;
public static final String CONFIG_PROPERTY_STATE =
"dfs.datanode.simulateddatastorage.state";
private static final DatanodeStorage.State DEFAULT_STATE =
DatanodeStorage.State.NORMAL;
static final byte[] nullCrcFileData; static final byte[] nullCrcFileData;
static { static {
DataChecksum checksum = DataChecksum.newDataChecksum( DataChecksum checksum = DataChecksum.newDataChecksum(
@ -325,9 +331,9 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
private static class SimulatedStorage { private static class SimulatedStorage {
private Map<String, SimulatedBPStorage> map = private Map<String, SimulatedBPStorage> map =
new HashMap<String, SimulatedBPStorage>(); new HashMap<String, SimulatedBPStorage>();
private final String storageUuid = "SimulatedStroage-" + DatanodeStorage.generateUuid();
private final long capacity; // in bytes private final long capacity; // in bytes
private final DatanodeStorage dnStorage;
synchronized long getFree() { synchronized long getFree() {
return capacity - getUsed(); return capacity - getUsed();
@ -365,8 +371,11 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
getBPStorage(bpid).free(amount); getBPStorage(bpid).free(amount);
} }
SimulatedStorage(long cap) { SimulatedStorage(long cap, DatanodeStorage.State state) {
capacity = cap; capacity = cap;
dnStorage = new DatanodeStorage(
"SimulatedStorage-" + DatanodeStorage.generateUuid(),
state, StorageType.DEFAULT);
} }
synchronized void addBlockPool(String bpid) { synchronized void addBlockPool(String bpid) {
@ -390,11 +399,15 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
} }
String getStorageUuid() { String getStorageUuid() {
return storageUuid; return dnStorage.getStorageID();
}
DatanodeStorage getDnStorage() {
return dnStorage;
} }
synchronized StorageReport getStorageReport(String bpid) { synchronized StorageReport getStorageReport(String bpid) {
return new StorageReport(new DatanodeStorage(getStorageUuid()), return new StorageReport(dnStorage,
false, getCapacity(), getUsed(), getFree(), false, getCapacity(), getUsed(), getFree(),
map.get(bpid).getUsed()); map.get(bpid).getUsed());
} }
@ -417,7 +430,8 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
registerMBean(datanodeUuid); registerMBean(datanodeUuid);
this.storage = new SimulatedStorage( this.storage = new SimulatedStorage(
conf.getLong(CONFIG_PROPERTY_CAPACITY, DEFAULT_CAPACITY)); conf.getLong(CONFIG_PROPERTY_CAPACITY, DEFAULT_CAPACITY),
conf.getEnum(CONFIG_PROPERTY_STATE, DEFAULT_STATE));
} }
public synchronized void injectBlocks(String bpid, public synchronized void injectBlocks(String bpid,
@ -488,7 +502,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
@Override @Override
public synchronized Map<DatanodeStorage, BlockListAsLongs> getBlockReports( public synchronized Map<DatanodeStorage, BlockListAsLongs> getBlockReports(
String bpid) { String bpid) {
return Collections.singletonMap(new DatanodeStorage(storage.storageUuid), getBlockReport(bpid)); return Collections.singletonMap(storage.getDnStorage(), getBlockReport(bpid));
} }
@Override // FsDatasetSpi @Override // FsDatasetSpi

View File

@ -0,0 +1,270 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.datanode;
import static org.hamcrest.CoreMatchers.*;
import static org.junit.Assert.*;
import static org.apache.hadoop.hdfs.server.protocol.DatanodeStorage.State.*;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Collections;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
import org.apache.hadoop.hdfs.server.blockmanagement.NumberReplicas;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import com.google.common.collect.Iterables;
/**
* Test proper {@link BlockManager} replication counting for {@link DatanodeStorage}s
* with {@link DatanodeStorage.State#READ_ONLY_SHARED READ_ONLY} state.
*
* Uses {@link SimulatedFSDataset} to inject read-only replicas into a DataNode.
*/
public class TestReadOnlySharedStorage {
public static final Log LOG = LogFactory.getLog(TestReadOnlySharedStorage.class);
private static short NUM_DATANODES = 3;
private static int RO_NODE_INDEX = 0;
private static final int BLOCK_SIZE = 1024;
private static final long seed = 0x1BADF00DL;
private static final Path PATH = new Path("/" + TestReadOnlySharedStorage.class.getName() + ".dat");
private static final int RETRIES = 10;
private Configuration conf;
private MiniDFSCluster cluster;
private DistributedFileSystem fs;
private DFSClient client;
private BlockManager blockManager;
private DatanodeManager datanodeManager;
private DatanodeInfo normalDataNode;
private DatanodeInfo readOnlyDataNode;
private Block block;
private ExtendedBlock extendedBlock;
/**
* Setup a {@link MiniDFSCluster}.
* Create a block with both {@link State#NORMAL} and {@link State#READ_ONLY_SHARED} replicas.
*/
@Before
public void setup() throws IOException, InterruptedException {
conf = new HdfsConfiguration();
SimulatedFSDataset.setFactory(conf);
Configuration[] overlays = new Configuration[NUM_DATANODES];
for (int i = 0; i < overlays.length; i++) {
overlays[i] = new Configuration();
if (i == RO_NODE_INDEX) {
overlays[i].setEnum(SimulatedFSDataset.CONFIG_PROPERTY_STATE,
i == RO_NODE_INDEX
? READ_ONLY_SHARED
: NORMAL);
}
}
cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(NUM_DATANODES)
.dataNodeConfOverlays(overlays)
.build();
fs = cluster.getFileSystem();
blockManager = cluster.getNameNode().getNamesystem().getBlockManager();
datanodeManager = blockManager.getDatanodeManager();
client = new DFSClient(new InetSocketAddress("localhost", cluster.getNameNodePort()),
cluster.getConfiguration(0));
for (int i = 0; i < NUM_DATANODES; i++) {
DataNode dataNode = cluster.getDataNodes().get(i);
validateStorageState(
BlockManagerTestUtil.getStorageReportsForDatanode(
datanodeManager.getDatanode(dataNode.getDatanodeId())),
i == RO_NODE_INDEX
? READ_ONLY_SHARED
: NORMAL);
}
// Create a 1 block file
DFSTestUtil.createFile(fs, PATH, BLOCK_SIZE, BLOCK_SIZE,
BLOCK_SIZE, (short) 1, seed);
LocatedBlock locatedBlock = getLocatedBlock();
extendedBlock = locatedBlock.getBlock();
block = extendedBlock.getLocalBlock();
assertThat(locatedBlock.getLocations().length, is(1));
normalDataNode = locatedBlock.getLocations()[0];
readOnlyDataNode = datanodeManager.getDatanode(cluster.getDataNodes().get(RO_NODE_INDEX).getDatanodeId());
assertThat(normalDataNode, is(not(readOnlyDataNode)));
validateNumberReplicas(1);
// Inject the block into the datanode with READ_ONLY_SHARED storage
cluster.injectBlocks(RO_NODE_INDEX, Collections.singleton(block));
// There should now be 2 *locations* for the block
// Must wait until the NameNode has processed the block report for the injected blocks
waitForLocations(2);
}
@After
public void tearDown() throws IOException {
fs.delete(PATH, false);
if (cluster != null) {
fs.close();
cluster.shutdown();
cluster = null;
}
}
private void waitForLocations(int locations) throws IOException, InterruptedException {
for (int tries = 0; tries < RETRIES; )
try {
LocatedBlock locatedBlock = getLocatedBlock();
assertThat(locatedBlock.getLocations().length, is(locations));
break;
} catch (AssertionError e) {
if (++tries < RETRIES) {
Thread.sleep(1000);
} else {
throw e;
}
}
}
private LocatedBlock getLocatedBlock() throws IOException {
LocatedBlocks locatedBlocks = client.getLocatedBlocks(PATH.toString(), 0, BLOCK_SIZE);
assertThat(locatedBlocks.getLocatedBlocks().size(), is(1));
return Iterables.getOnlyElement(locatedBlocks.getLocatedBlocks());
}
private void validateStorageState(StorageReport[] storageReports, DatanodeStorage.State state) {
for (StorageReport storageReport : storageReports) {
DatanodeStorage storage = storageReport.getStorage();
assertThat(storage.getState(), is(state));
}
}
private void validateNumberReplicas(int expectedReplicas) throws IOException {
NumberReplicas numberReplicas = blockManager.countNodes(block);
assertThat(numberReplicas.liveReplicas(), is(expectedReplicas));
assertThat(numberReplicas.excessReplicas(), is(0));
assertThat(numberReplicas.corruptReplicas(), is(0));
assertThat(numberReplicas.decommissionedReplicas(), is(0));
assertThat(numberReplicas.replicasOnStaleNodes(), is(0));
BlockManagerTestUtil.updateState(blockManager);
assertThat(blockManager.getUnderReplicatedBlocksCount(), is(0L));
assertThat(blockManager.getExcessBlocksCount(), is(0L));
}
/**
* Verify that <tt>READ_ONLY_SHARED</tt> replicas are <i>not</i> counted towards the overall
* replication count, but <i>are</i> included as replica locations returned to clients for reads.
*/
@Test
public void testReplicaCounting() throws Exception {
// There should only be 1 *replica* (the READ_ONLY_SHARED doesn't count)
validateNumberReplicas(1);
fs.setReplication(PATH, (short) 2);
// There should now be 3 *locations* for the block, and 2 *replicas*
waitForLocations(3);
validateNumberReplicas(2);
}
/**
* Verify that the NameNode is able to still use <tt>READ_ONLY_SHARED</tt> replicas even
* when the single NORMAL replica is offline (and the effective replication count is 0).
*/
@Test
public void testNormalReplicaOffline() throws Exception {
// Stop the datanode hosting the NORMAL replica
cluster.stopDataNode(normalDataNode.getXferAddr());
// Force NameNode to detect that the datanode is down
BlockManagerTestUtil.noticeDeadDatanode(
cluster.getNameNode(), normalDataNode.getXferAddr());
// The live replica count should now be zero (since the NORMAL replica is offline)
NumberReplicas numberReplicas = blockManager.countNodes(block);
assertThat(numberReplicas.liveReplicas(), is(0));
// The block should be reported as under-replicated
BlockManagerTestUtil.updateState(blockManager);
assertThat(blockManager.getUnderReplicatedBlocksCount(), is(1L));
// The BlockManager should be able to heal the replication count back to 1
// by triggering an inter-datanode replication from one of the READ_ONLY_SHARED replicas
BlockManagerTestUtil.computeAllPendingWork(blockManager);
DFSTestUtil.waitForReplication(cluster, extendedBlock, 1, 1, 0);
// There should now be 2 *locations* for the block, and 1 *replica*
assertThat(getLocatedBlock().getLocations().length, is(2));
validateNumberReplicas(1);
}
/**
* Verify that corrupt <tt>READ_ONLY_SHARED</tt> replicas aren't counted
* towards the corrupt replicas total.
*/
@Test
public void testReadOnlyReplicaCorrupt() throws Exception {
// "Corrupt" a READ_ONLY_SHARED replica by reporting it as a bad replica
client.reportBadBlocks(new LocatedBlock[] {
new LocatedBlock(extendedBlock, new DatanodeInfo[] { readOnlyDataNode })
});
// There should now be only 1 *location* for the block as the READ_ONLY_SHARED is corrupt
waitForLocations(1);
// However, the corrupt READ_ONLY_SHARED replica should *not* affect the overall corrupt replicas count
NumberReplicas numberReplicas = blockManager.countNodes(block);
assertThat(numberReplicas.corruptReplicas(), is(0));
}
}