HDFS-5318. Support read-only and read-write paths to shared replicas. (Contributed by Eric Sirianni)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1569951 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Arpit Agarwal 2014-02-19 22:59:37 +00:00
parent 06b504f4a6
commit 2f341414dd
13 changed files with 413 additions and 41 deletions

View File

@ -765,6 +765,19 @@ public class Configuration implements Iterable<Map.Entry<String,String>>,
addResourceObject(new Resource(in, name));
}
/**
* Add a configuration resource.
*
* The properties of this resource will override properties of previously
* added resources, unless they were marked <a href="#Final">final</a>.
*
* @param conf Configuration object from which to load properties
*/
public void addResource(Configuration conf) {
addResourceObject(new Resource(conf.getProps()));
}
/**
* Reload configuration from previously added resources.

View File

@ -418,6 +418,9 @@ Release 2.4.0 - UNRELEASED
HDFS-5973. add DomainSocket#shutdown method (cmccabe)
HDFS-5318. Support read-only and read-write paths to shared replicas.
(Eric Sirianni via Arpit Agarwal)
OPTIMIZATIONS
HDFS-5790. LeaseManager.findPath is very slow when many leases need recovery

View File

@ -1520,8 +1520,8 @@ public class PBHelper {
private static StorageState convertState(State state) {
switch(state) {
case READ_ONLY:
return StorageState.READ_ONLY;
case READ_ONLY_SHARED:
return StorageState.READ_ONLY_SHARED;
case NORMAL:
default:
return StorageState.NORMAL;
@ -1549,8 +1549,8 @@ public class PBHelper {
private static State convertState(StorageState state) {
switch(state) {
case READ_ONLY:
return DatanodeStorage.State.READ_ONLY;
case READ_ONLY_SHARED:
return DatanodeStorage.State.READ_ONLY_SHARED;
case NORMAL:
default:
return DatanodeStorage.State.NORMAL;

View File

@ -73,6 +73,7 @@ import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage.State;
import org.apache.hadoop.hdfs.server.protocol.KeyUpdateCommand;
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
@ -482,7 +483,10 @@ public class BlockManager {
chooseSourceDatanode(block, containingNodes,
containingLiveReplicasNodes, numReplicas,
UnderReplicatedBlocks.LEVEL);
assert containingLiveReplicasNodes.size() == numReplicas.liveReplicas();
// containingLiveReplicasNodes can include READ_ONLY_SHARED replicas which are
// not included in the numReplicas.liveReplicas() count
assert containingLiveReplicasNodes.size() >= numReplicas.liveReplicas();
int usableReplicas = numReplicas.liveReplicas() +
numReplicas.decommissionedReplicas();
@ -1021,7 +1025,7 @@ public class BlockManager {
*/
private void addToInvalidates(Block b) {
StringBuilder datanodes = new StringBuilder();
for(DatanodeStorageInfo storage : blocksMap.getStorages(b)) {
for(DatanodeStorageInfo storage : blocksMap.getStorages(b, State.NORMAL)) {
final DatanodeDescriptor node = storage.getDatanodeDescriptor();
invalidateBlocks.add(b, node, false);
datanodes.append(node).append(" ");
@ -1235,7 +1239,10 @@ public class BlockManager {
continue;
}
assert liveReplicaNodes.size() == numReplicas.liveReplicas();
// liveReplicaNodes can include READ_ONLY_SHARED replicas which are
// not included in the numReplicas.liveReplicas() count
assert liveReplicaNodes.size() >= numReplicas.liveReplicas();
// do not schedule more if enough replicas is already pending
numEffectiveReplicas = numReplicas.liveReplicas() +
pendingReplications.getNumReplicas(block);
@ -1475,15 +1482,16 @@ public class BlockManager {
final DatanodeDescriptor node = storage.getDatanodeDescriptor();
LightWeightLinkedSet<Block> excessBlocks =
excessReplicateMap.get(node.getDatanodeUuid());
int countableReplica = storage.getState() == State.NORMAL ? 1 : 0;
if ((nodesCorrupt != null) && (nodesCorrupt.contains(node)))
corrupt++;
corrupt += countableReplica;
else if (node.isDecommissionInProgress() || node.isDecommissioned())
decommissioned++;
decommissioned += countableReplica;
else if (excessBlocks != null && excessBlocks.contains(block)) {
excess++;
excess += countableReplica;
} else {
nodesContainingLiveReplicas.add(storage);
live++;
live += countableReplica;
}
containingNodes.add(node);
// Check if this replica is corrupt
@ -2480,7 +2488,7 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
Collection<DatanodeDescriptor> nonExcess = new ArrayList<DatanodeDescriptor>();
Collection<DatanodeDescriptor> corruptNodes = corruptReplicas
.getNodes(block);
for(DatanodeStorageInfo storage : blocksMap.getStorages(block)) {
for(DatanodeStorageInfo storage : blocksMap.getStorages(block, State.NORMAL)) {
final DatanodeDescriptor cur = storage.getDatanodeDescriptor();
if (storage.areBlockContentsStale()) {
LOG.info("BLOCK* processOverReplicatedBlock: " +
@ -2809,7 +2817,7 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
int excess = 0;
int stale = 0;
Collection<DatanodeDescriptor> nodesCorrupt = corruptReplicas.getNodes(b);
for(DatanodeStorageInfo storage : blocksMap.getStorages(b)) {
for(DatanodeStorageInfo storage : blocksMap.getStorages(b, State.NORMAL)) {
final DatanodeDescriptor node = storage.getDatanodeDescriptor();
if ((nodesCorrupt != null) && (nodesCorrupt.contains(node))) {
corrupt++;
@ -2848,7 +2856,7 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
// else proceed with fast case
int live = 0;
Collection<DatanodeDescriptor> nodesCorrupt = corruptReplicas.getNodes(b);
for(DatanodeStorageInfo storage : blocksMap.getStorages(b)) {
for(DatanodeStorageInfo storage : blocksMap.getStorages(b, State.NORMAL)) {
final DatanodeDescriptor node = storage.getDatanodeDescriptor();
if ((nodesCorrupt == null) || (!nodesCorrupt.contains(node)))
live++;

View File

@ -605,7 +605,7 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
+ storageType);
return false;
}
if (storage.getState() == State.READ_ONLY) {
if (storage.getState() == State.READ_ONLY_SHARED) {
logNodeIsNotChosen(storage, "storage is read-only");
return false;
}

View File

@ -20,9 +20,13 @@ package org.apache.hadoop.hdfs.server.blockmanagement;
import java.util.Iterator;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.util.GSet;
import org.apache.hadoop.util.LightWeightGSet;
import com.google.common.base.Predicate;
import com.google.common.collect.Iterables;
/**
* This class maintains the map from a block to its metadata.
* block's metadata currently includes blockCollection it belongs to and
@ -121,6 +125,22 @@ class BlocksMap {
return getStorages(blocks.get(b));
}
/**
* Searches for the block in the BlocksMap and
* returns {@link Iterable} of the storages the block belongs to
* <i>that are of the given {@link DatanodeStorage.State state}</i>.
*
* @param state DatanodeStorage state by which to filter the returned Iterable
*/
Iterable<DatanodeStorageInfo> getStorages(Block b, final DatanodeStorage.State state) {
return Iterables.filter(getStorages(blocks.get(b)), new Predicate<DatanodeStorageInfo>() {
@Override
public boolean apply(DatanodeStorageInfo storage) {
return storage.getState() == state;
}
});
}
/**
* For a block that has already been retrieved from the BlocksMap
* returns {@link Iterable} of the storages the block belongs to.

View File

@ -58,6 +58,8 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementStatus;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.NumberReplicas;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
import org.apache.hadoop.net.NetUtils;
@ -378,11 +380,13 @@ public class NamenodeFsck {
boolean isCorrupt = lBlk.isCorrupt();
String blkName = block.toString();
DatanodeInfo[] locs = lBlk.getLocations();
res.totalReplicas += locs.length;
NumberReplicas numberReplicas = namenode.getNamesystem().getBlockManager().countNodes(block.getLocalBlock());
int liveReplicas = numberReplicas.liveReplicas();
res.totalReplicas += liveReplicas;
short targetFileReplication = file.getReplication();
res.numExpectedReplicas += targetFileReplication;
if (locs.length > targetFileReplication) {
res.excessiveReplicas += (locs.length - targetFileReplication);
if (liveReplicas > targetFileReplication) {
res.excessiveReplicas += (liveReplicas - targetFileReplication);
res.numOverReplicatedBlocks += 1;
}
// Check if block is Corrupt
@ -392,10 +396,10 @@ public class NamenodeFsck {
out.print("\n" + path + ": CORRUPT blockpool " + block.getBlockPoolId() +
" block " + block.getBlockName()+"\n");
}
if (locs.length >= minReplication)
if (liveReplicas >= minReplication)
res.numMinReplicatedBlocks++;
if (locs.length < targetFileReplication && locs.length > 0) {
res.missingReplicas += (targetFileReplication - locs.length);
if (liveReplicas < targetFileReplication && liveReplicas > 0) {
res.missingReplicas += (targetFileReplication - liveReplicas);
res.numUnderReplicatedBlocks += 1;
underReplicatedPerFile++;
if (!showFiles) {
@ -404,7 +408,7 @@ public class NamenodeFsck {
out.println(" Under replicated " + block +
". Target Replicas is " +
targetFileReplication + " but found " +
locs.length + " replica(s).");
liveReplicas + " replica(s).");
}
// verify block placement policy
BlockPlacementStatus blockPlacementStatus = bpPolicy
@ -421,13 +425,13 @@ public class NamenodeFsck {
block + ". " + blockPlacementStatus.getErrorDescription());
}
report.append(i + ". " + blkName + " len=" + block.getNumBytes());
if (locs.length == 0) {
if (liveReplicas == 0) {
report.append(" MISSING!");
res.addMissing(block.toString(), block.getNumBytes());
missing++;
missize += block.getNumBytes();
} else {
report.append(" repl=" + locs.length);
report.append(" repl=" + liveReplicas);
if (showLocations || showRacks) {
StringBuilder sb = new StringBuilder("[");
for (int j = 0; j < locs.length; j++) {

View File

@ -28,7 +28,18 @@ public class DatanodeStorage {
/** The state of the storage. */
public enum State {
NORMAL,
READ_ONLY
/**
* A storage that represents a read-only path to replicas stored on a shared storage device.
* Replicas on {@link #READ_ONLY_SHARED} storage are not counted towards live replicas.
*
* <p>
* In certain implementations, a {@link #READ_ONLY_SHARED} storage may be correlated to
* its {@link #NORMAL} counterpart using the {@link DatanodeStorage#storageID}. This
* property should be used for debugging purposes only.
* </p>
*/
READ_ONLY_SHARED;
}
private final String storageID;

View File

@ -50,7 +50,7 @@ message DatanodeRegistrationProto {
message DatanodeStorageProto {
enum StorageState {
NORMAL = 0;
READ_ONLY = 1;
READ_ONLY_SHARED = 1;
}
required string storageUuid = 1;

View File

@ -157,6 +157,7 @@ public class MiniDFSCluster {
private boolean checkExitOnShutdown = true;
private boolean checkDataNodeAddrConfig = false;
private boolean checkDataNodeHostConfig = false;
private Configuration[] dnConfOverlays;
public Builder(Configuration conf) {
this.conf = conf;
@ -333,6 +334,19 @@ public class MiniDFSCluster {
return this;
}
/**
* Default: null
*
* An array of {@link Configuration} objects that will overlay the
* global MiniDFSCluster Configuration for the corresponding DataNode.
*
* Useful for setting specific per-DataNode configuration parameters.
*/
public Builder dataNodeConfOverlays(Configuration[] dnConfOverlays) {
this.dnConfOverlays = dnConfOverlays;
return this;
}
/**
* Construct the actual MiniDFSCluster
*/
@ -375,7 +389,8 @@ public class MiniDFSCluster {
builder.nnTopology,
builder.checkExitOnShutdown,
builder.checkDataNodeAddrConfig,
builder.checkDataNodeHostConfig);
builder.checkDataNodeHostConfig,
builder.dnConfOverlays);
}
public class DataNodeProperties {
@ -621,7 +636,7 @@ public class MiniDFSCluster {
manageNameDfsDirs, true, manageDataDfsDirs, manageDataDfsDirs,
operation, null, racks, hosts,
simulatedCapacities, null, true, false,
MiniDFSNNTopology.simpleSingleNN(nameNodePort, 0), true, false, false);
MiniDFSNNTopology.simpleSingleNN(nameNodePort, 0), true, false, false, null);
}
private void initMiniDFSCluster(
@ -634,7 +649,8 @@ public class MiniDFSCluster {
boolean waitSafeMode, boolean setupHostsFile,
MiniDFSNNTopology nnTopology, boolean checkExitOnShutdown,
boolean checkDataNodeAddrConfig,
boolean checkDataNodeHostConfig)
boolean checkDataNodeHostConfig,
Configuration[] dnConfOverlays)
throws IOException {
ExitUtil.disableSystemExit();
@ -699,7 +715,7 @@ public class MiniDFSCluster {
startDataNodes(conf, numDataNodes, storageType, manageDataDfsDirs,
dnStartOpt != null ? dnStartOpt : startOpt,
racks, hosts, simulatedCapacities, setupHostsFile,
checkDataNodeAddrConfig, checkDataNodeHostConfig);
checkDataNodeAddrConfig, checkDataNodeHostConfig, dnConfOverlays);
waitClusterUp();
//make sure ProxyUsers uses the latest conf
ProxyUsers.refreshSuperUserGroupsConfiguration(conf);
@ -1102,7 +1118,7 @@ public class MiniDFSCluster {
long[] simulatedCapacities,
boolean setupHostsFile) throws IOException {
startDataNodes(conf, numDataNodes, StorageType.DEFAULT, manageDfsDirs, operation, racks, hosts,
simulatedCapacities, setupHostsFile, false, false);
simulatedCapacities, setupHostsFile, false, false, null);
}
/**
@ -1116,7 +1132,7 @@ public class MiniDFSCluster {
boolean setupHostsFile,
boolean checkDataNodeAddrConfig) throws IOException {
startDataNodes(conf, numDataNodes, StorageType.DEFAULT, manageDfsDirs, operation, racks, hosts,
simulatedCapacities, setupHostsFile, checkDataNodeAddrConfig, false);
simulatedCapacities, setupHostsFile, checkDataNodeAddrConfig, false, null);
}
/**
@ -1143,7 +1159,8 @@ public class MiniDFSCluster {
* @param setupHostsFile add new nodes to dfs hosts files
* @param checkDataNodeAddrConfig if true, only set DataNode port addresses if not already set in config
* @param checkDataNodeHostConfig if true, only set DataNode hostname key if not already set in config
*
* @param dnConfOverlays An array of {@link Configuration} objects that will overlay the
* global MiniDFSCluster Configuration for the corresponding DataNode.
* @throws IllegalStateException if NameNode has been shutdown
*/
public synchronized void startDataNodes(Configuration conf, int numDataNodes,
@ -1152,7 +1169,8 @@ public class MiniDFSCluster {
long[] simulatedCapacities,
boolean setupHostsFile,
boolean checkDataNodeAddrConfig,
boolean checkDataNodeHostConfig) throws IOException {
boolean checkDataNodeHostConfig,
Configuration[] dnConfOverlays) throws IOException {
if (operation == StartupOption.RECOVER) {
return;
}
@ -1192,6 +1210,13 @@ public class MiniDFSCluster {
+ simulatedCapacities.length
+ "] is less than the number of datanodes [" + numDataNodes + "].");
}
if (dnConfOverlays != null
&& numDataNodes > dnConfOverlays.length) {
throw new IllegalArgumentException( "The length of dnConfOverlays ["
+ dnConfOverlays.length
+ "] is less than the number of datanodes [" + numDataNodes + "].");
}
String [] dnArgs = (operation == null ||
operation != StartupOption.ROLLBACK) ?
@ -1200,6 +1225,9 @@ public class MiniDFSCluster {
for (int i = curDatanodesNum; i < curDatanodesNum+numDataNodes; i++) {
Configuration dnConf = new HdfsConfiguration(conf);
if (dnConfOverlays != null) {
dnConf.addResource(dnConfOverlays[i]);
}
// Set up datanode address
setupDatanodeAddress(dnConf, setupHostsFile, checkDataNodeAddrConfig);
if (manageDfsDirs) {

View File

@ -210,7 +210,8 @@ public class MiniDFSClusterWithNodeGroup extends MiniDFSCluster {
long[] simulatedCapacities,
boolean setupHostsFile,
boolean checkDataNodeAddrConfig,
boolean checkDataNodeHostConfig) throws IOException {
boolean checkDataNodeHostConfig,
Configuration[] dnConfOverlays) throws IOException {
startDataNodes(conf, numDataNodes, storageType, manageDfsDirs, operation, racks,
NODE_GROUPS, hosts, simulatedCapacities, setupHostsFile,
checkDataNodeAddrConfig, checkDataNodeHostConfig);

View File

@ -34,6 +34,7 @@ import javax.management.StandardMBean;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.StorageType;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
@ -96,6 +97,11 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
public static final long DEFAULT_CAPACITY = 2L<<40; // 1 terabyte
public static final byte DEFAULT_DATABYTE = 9;
public static final String CONFIG_PROPERTY_STATE =
"dfs.datanode.simulateddatastorage.state";
private static final DatanodeStorage.State DEFAULT_STATE =
DatanodeStorage.State.NORMAL;
static final byte[] nullCrcFileData;
static {
DataChecksum checksum = DataChecksum.newDataChecksum(
@ -325,9 +331,9 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
private static class SimulatedStorage {
private Map<String, SimulatedBPStorage> map =
new HashMap<String, SimulatedBPStorage>();
private final String storageUuid = "SimulatedStroage-" + DatanodeStorage.generateUuid();
private final long capacity; // in bytes
private final DatanodeStorage dnStorage;
synchronized long getFree() {
return capacity - getUsed();
@ -365,8 +371,11 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
getBPStorage(bpid).free(amount);
}
SimulatedStorage(long cap) {
SimulatedStorage(long cap, DatanodeStorage.State state) {
capacity = cap;
dnStorage = new DatanodeStorage(
"SimulatedStorage-" + DatanodeStorage.generateUuid(),
state, StorageType.DEFAULT);
}
synchronized void addBlockPool(String bpid) {
@ -390,11 +399,15 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
}
String getStorageUuid() {
return storageUuid;
return dnStorage.getStorageID();
}
DatanodeStorage getDnStorage() {
return dnStorage;
}
synchronized StorageReport getStorageReport(String bpid) {
return new StorageReport(new DatanodeStorage(getStorageUuid()),
return new StorageReport(dnStorage,
false, getCapacity(), getUsed(), getFree(),
map.get(bpid).getUsed());
}
@ -417,7 +430,8 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
registerMBean(datanodeUuid);
this.storage = new SimulatedStorage(
conf.getLong(CONFIG_PROPERTY_CAPACITY, DEFAULT_CAPACITY));
conf.getLong(CONFIG_PROPERTY_CAPACITY, DEFAULT_CAPACITY),
conf.getEnum(CONFIG_PROPERTY_STATE, DEFAULT_STATE));
}
public synchronized void injectBlocks(String bpid,
@ -488,7 +502,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
@Override
public synchronized Map<DatanodeStorage, BlockListAsLongs> getBlockReports(
String bpid) {
return Collections.singletonMap(new DatanodeStorage(storage.storageUuid), getBlockReport(bpid));
return Collections.singletonMap(storage.getDnStorage(), getBlockReport(bpid));
}
@Override // FsDatasetSpi

View File

@ -0,0 +1,270 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.datanode;
import static org.hamcrest.CoreMatchers.*;
import static org.junit.Assert.*;
import static org.apache.hadoop.hdfs.server.protocol.DatanodeStorage.State.*;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Collections;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
import org.apache.hadoop.hdfs.server.blockmanagement.NumberReplicas;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import com.google.common.collect.Iterables;
/**
* Test proper {@link BlockManager} replication counting for {@link DatanodeStorage}s
* with {@link DatanodeStorage.State#READ_ONLY_SHARED READ_ONLY} state.
*
* Uses {@link SimulatedFSDataset} to inject read-only replicas into a DataNode.
*/
public class TestReadOnlySharedStorage {
public static final Log LOG = LogFactory.getLog(TestReadOnlySharedStorage.class);
private static short NUM_DATANODES = 3;
private static int RO_NODE_INDEX = 0;
private static final int BLOCK_SIZE = 1024;
private static final long seed = 0x1BADF00DL;
private static final Path PATH = new Path("/" + TestReadOnlySharedStorage.class.getName() + ".dat");
private static final int RETRIES = 10;
private Configuration conf;
private MiniDFSCluster cluster;
private DistributedFileSystem fs;
private DFSClient client;
private BlockManager blockManager;
private DatanodeManager datanodeManager;
private DatanodeInfo normalDataNode;
private DatanodeInfo readOnlyDataNode;
private Block block;
private ExtendedBlock extendedBlock;
/**
* Setup a {@link MiniDFSCluster}.
* Create a block with both {@link State#NORMAL} and {@link State#READ_ONLY_SHARED} replicas.
*/
@Before
public void setup() throws IOException, InterruptedException {
conf = new HdfsConfiguration();
SimulatedFSDataset.setFactory(conf);
Configuration[] overlays = new Configuration[NUM_DATANODES];
for (int i = 0; i < overlays.length; i++) {
overlays[i] = new Configuration();
if (i == RO_NODE_INDEX) {
overlays[i].setEnum(SimulatedFSDataset.CONFIG_PROPERTY_STATE,
i == RO_NODE_INDEX
? READ_ONLY_SHARED
: NORMAL);
}
}
cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(NUM_DATANODES)
.dataNodeConfOverlays(overlays)
.build();
fs = cluster.getFileSystem();
blockManager = cluster.getNameNode().getNamesystem().getBlockManager();
datanodeManager = blockManager.getDatanodeManager();
client = new DFSClient(new InetSocketAddress("localhost", cluster.getNameNodePort()),
cluster.getConfiguration(0));
for (int i = 0; i < NUM_DATANODES; i++) {
DataNode dataNode = cluster.getDataNodes().get(i);
validateStorageState(
BlockManagerTestUtil.getStorageReportsForDatanode(
datanodeManager.getDatanode(dataNode.getDatanodeId())),
i == RO_NODE_INDEX
? READ_ONLY_SHARED
: NORMAL);
}
// Create a 1 block file
DFSTestUtil.createFile(fs, PATH, BLOCK_SIZE, BLOCK_SIZE,
BLOCK_SIZE, (short) 1, seed);
LocatedBlock locatedBlock = getLocatedBlock();
extendedBlock = locatedBlock.getBlock();
block = extendedBlock.getLocalBlock();
assertThat(locatedBlock.getLocations().length, is(1));
normalDataNode = locatedBlock.getLocations()[0];
readOnlyDataNode = datanodeManager.getDatanode(cluster.getDataNodes().get(RO_NODE_INDEX).getDatanodeId());
assertThat(normalDataNode, is(not(readOnlyDataNode)));
validateNumberReplicas(1);
// Inject the block into the datanode with READ_ONLY_SHARED storage
cluster.injectBlocks(RO_NODE_INDEX, Collections.singleton(block));
// There should now be 2 *locations* for the block
// Must wait until the NameNode has processed the block report for the injected blocks
waitForLocations(2);
}
@After
public void tearDown() throws IOException {
fs.delete(PATH, false);
if (cluster != null) {
fs.close();
cluster.shutdown();
cluster = null;
}
}
private void waitForLocations(int locations) throws IOException, InterruptedException {
for (int tries = 0; tries < RETRIES; )
try {
LocatedBlock locatedBlock = getLocatedBlock();
assertThat(locatedBlock.getLocations().length, is(locations));
break;
} catch (AssertionError e) {
if (++tries < RETRIES) {
Thread.sleep(1000);
} else {
throw e;
}
}
}
private LocatedBlock getLocatedBlock() throws IOException {
LocatedBlocks locatedBlocks = client.getLocatedBlocks(PATH.toString(), 0, BLOCK_SIZE);
assertThat(locatedBlocks.getLocatedBlocks().size(), is(1));
return Iterables.getOnlyElement(locatedBlocks.getLocatedBlocks());
}
private void validateStorageState(StorageReport[] storageReports, DatanodeStorage.State state) {
for (StorageReport storageReport : storageReports) {
DatanodeStorage storage = storageReport.getStorage();
assertThat(storage.getState(), is(state));
}
}
private void validateNumberReplicas(int expectedReplicas) throws IOException {
NumberReplicas numberReplicas = blockManager.countNodes(block);
assertThat(numberReplicas.liveReplicas(), is(expectedReplicas));
assertThat(numberReplicas.excessReplicas(), is(0));
assertThat(numberReplicas.corruptReplicas(), is(0));
assertThat(numberReplicas.decommissionedReplicas(), is(0));
assertThat(numberReplicas.replicasOnStaleNodes(), is(0));
BlockManagerTestUtil.updateState(blockManager);
assertThat(blockManager.getUnderReplicatedBlocksCount(), is(0L));
assertThat(blockManager.getExcessBlocksCount(), is(0L));
}
/**
* Verify that <tt>READ_ONLY_SHARED</tt> replicas are <i>not</i> counted towards the overall
* replication count, but <i>are</i> included as replica locations returned to clients for reads.
*/
@Test
public void testReplicaCounting() throws Exception {
// There should only be 1 *replica* (the READ_ONLY_SHARED doesn't count)
validateNumberReplicas(1);
fs.setReplication(PATH, (short) 2);
// There should now be 3 *locations* for the block, and 2 *replicas*
waitForLocations(3);
validateNumberReplicas(2);
}
/**
* Verify that the NameNode is able to still use <tt>READ_ONLY_SHARED</tt> replicas even
* when the single NORMAL replica is offline (and the effective replication count is 0).
*/
@Test
public void testNormalReplicaOffline() throws Exception {
// Stop the datanode hosting the NORMAL replica
cluster.stopDataNode(normalDataNode.getXferAddr());
// Force NameNode to detect that the datanode is down
BlockManagerTestUtil.noticeDeadDatanode(
cluster.getNameNode(), normalDataNode.getXferAddr());
// The live replica count should now be zero (since the NORMAL replica is offline)
NumberReplicas numberReplicas = blockManager.countNodes(block);
assertThat(numberReplicas.liveReplicas(), is(0));
// The block should be reported as under-replicated
BlockManagerTestUtil.updateState(blockManager);
assertThat(blockManager.getUnderReplicatedBlocksCount(), is(1L));
// The BlockManager should be able to heal the replication count back to 1
// by triggering an inter-datanode replication from one of the READ_ONLY_SHARED replicas
BlockManagerTestUtil.computeAllPendingWork(blockManager);
DFSTestUtil.waitForReplication(cluster, extendedBlock, 1, 1, 0);
// There should now be 2 *locations* for the block, and 1 *replica*
assertThat(getLocatedBlock().getLocations().length, is(2));
validateNumberReplicas(1);
}
/**
* Verify that corrupt <tt>READ_ONLY_SHARED</tt> replicas aren't counted
* towards the corrupt replicas total.
*/
@Test
public void testReadOnlyReplicaCorrupt() throws Exception {
// "Corrupt" a READ_ONLY_SHARED replica by reporting it as a bad replica
client.reportBadBlocks(new LocatedBlock[] {
new LocatedBlock(extendedBlock, new DatanodeInfo[] { readOnlyDataNode })
});
// There should now be only 1 *location* for the block as the READ_ONLY_SHARED is corrupt
waitForLocations(1);
// However, the corrupt READ_ONLY_SHARED replica should *not* affect the overall corrupt replicas count
NumberReplicas numberReplicas = blockManager.countNodes(block);
assertThat(numberReplicas.corruptReplicas(), is(0));
}
}