HDFS-12818. Support multiple storages in DataNodeCluster / SimulatedFSDataset. Contributed by Erik Krogen.

This commit is contained in:
Erik Krogen 2017-12-18 11:36:22 -08:00 committed by Konstantin V Shvachko
parent 001008958d
commit 94576b17fb
3 changed files with 351 additions and 152 deletions

View File

@ -23,8 +23,8 @@
import java.io.OutputStream; import java.io.OutputStream;
import java.net.URI; import java.net.URI;
import java.nio.channels.ClosedChannelException; import java.nio.channels.ClosedChannelException;
import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
@ -37,11 +37,13 @@
import javax.management.ObjectName; import javax.management.ObjectName;
import javax.management.StandardMBean; import javax.management.StandardMBean;
import com.google.common.math.LongMath;
import org.apache.commons.lang.ArrayUtils; import org.apache.commons.lang.ArrayUtils;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.DF; import org.apache.hadoop.fs.DF;
import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.server.datanode.checker.VolumeCheckResult; import org.apache.hadoop.hdfs.server.datanode.checker.VolumeCheckResult;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetImplTestUtils;
import org.apache.hadoop.util.AutoCloseableLock; import org.apache.hadoop.util.AutoCloseableLock;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.Block;
@ -88,6 +90,7 @@
*/ */
public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> { public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
public final static int BYTE_MASK = 0xff; public final static int BYTE_MASK = 0xff;
private final static int DEFAULT_NUM_SIMULATED_DATA_DIRS = 1;
static class Factory extends FsDatasetSpi.Factory<SimulatedFSDataset> { static class Factory extends FsDatasetSpi.Factory<SimulatedFSDataset> {
@Override @Override
public SimulatedFSDataset newInstance(DataNode datanode, public SimulatedFSDataset newInstance(DataNode datanode,
@ -100,10 +103,42 @@ public boolean isSimulated() {
return true; return true;
} }
} }
/**
* Used to change the default number of data storages and to mark the
* FSDataset as simulated.
*/
static class TestUtilsFactory
extends FsDatasetTestUtils.Factory<FsDatasetTestUtils> {
@Override
public FsDatasetTestUtils newInstance(DataNode datanode) {
return new FsDatasetImplTestUtils(datanode) {
@Override
public int getDefaultNumOfDataDirs() {
return DEFAULT_NUM_SIMULATED_DATA_DIRS;
}
};
}
@Override
public boolean isSimulated() {
return true;
}
@Override
public int getDefaultNumOfDataDirs() {
return DEFAULT_NUM_SIMULATED_DATA_DIRS;
}
}
public static void setFactory(Configuration conf) { public static void setFactory(Configuration conf) {
conf.set(DFSConfigKeys.DFS_DATANODE_FSDATASET_FACTORY_KEY, conf.set(DFSConfigKeys.DFS_DATANODE_FSDATASET_FACTORY_KEY,
Factory.class.getName()); Factory.class.getName());
conf.setClass("org.apache.hadoop.hdfs.server.datanode." +
"SimulatedFSDatasetTestUtilsFactory",
TestUtilsFactory.class, FsDatasetTestUtils.Factory.class
);
} }
public static byte simulatedByte(Block b, long offsetInBlk) { public static byte simulatedByte(Block b, long offsetInBlk) {
@ -151,7 +186,7 @@ private class BInfo implements ReplicaInPipeline {
if (theBlock.getNumBytes() < 0) { if (theBlock.getNumBytes() < 0) {
theBlock.setNumBytes(0); theBlock.setNumBytes(0);
} }
if (!storage.alloc(bpid, theBlock.getNumBytes())) { if (!getStorage(theBlock).alloc(bpid, theBlock.getNumBytes())) {
// expected length - actual length may // expected length - actual length may
// be more - we find out at finalize // be more - we find out at finalize
DataNode.LOG.warn("Lack of free storage on a block alloc"); DataNode.LOG.warn("Lack of free storage on a block alloc");
@ -169,7 +204,7 @@ private class BInfo implements ReplicaInPipeline {
@Override @Override
public String getStorageUuid() { public String getStorageUuid() {
return storage.getStorageUuid(); return getStorage(theBlock).getStorageUuid();
} }
@Override @Override
@ -226,12 +261,12 @@ synchronized void finalizeBlock(String bpid, long finalSize)
// adjust if necessary // adjust if necessary
long extraLen = finalSize - theBlock.getNumBytes(); long extraLen = finalSize - theBlock.getNumBytes();
if (extraLen > 0) { if (extraLen > 0) {
if (!storage.alloc(bpid,extraLen)) { if (!getStorage(theBlock).alloc(bpid, extraLen)) {
DataNode.LOG.warn("Lack of free storage on a block alloc"); DataNode.LOG.warn("Lack of free storage on a block alloc");
throw new IOException("Creating block, no free space available"); throw new IOException("Creating block, no free space available");
} }
} else { } else {
storage.free(bpid, -extraLen); getStorage(theBlock).free(bpid, -extraLen);
} }
theBlock.setNumBytes(finalSize); theBlock.setNumBytes(finalSize);
@ -271,7 +306,7 @@ synchronized public ReplicaOutputStreams createStreams(boolean isCreate,
} else { } else {
SimulatedOutputStream crcStream = new SimulatedOutputStream(); SimulatedOutputStream crcStream = new SimulatedOutputStream();
return new ReplicaOutputStreams(oStream, crcStream, requestedChecksum, return new ReplicaOutputStreams(oStream, crcStream, requestedChecksum,
volume, fileIoProvider); getStorage(theBlock).getVolume(), fileIoProvider);
} }
} }
@ -368,6 +403,7 @@ public void stopWriter(long xceiverStopTimeout) throws IOException {
*/ */
private static class SimulatedBPStorage { private static class SimulatedBPStorage {
private long used; // in bytes private long used; // in bytes
private final Map<Block, BInfo> blockMap = new TreeMap<>();
long getUsed() { long getUsed() {
return used; return used;
@ -381,6 +417,10 @@ void free(long amount) {
used -= amount; used -= amount;
} }
Map<Block, BInfo> getBlockMap() {
return blockMap;
}
SimulatedBPStorage() { SimulatedBPStorage() {
used = 0; used = 0;
} }
@ -392,10 +432,11 @@ void free(long amount) {
*/ */
private static class SimulatedStorage { private static class SimulatedStorage {
private final Map<String, SimulatedBPStorage> map = private final Map<String, SimulatedBPStorage> map =
new HashMap<String, SimulatedBPStorage>(); new ConcurrentHashMap<>();
private final long capacity; // in bytes private final long capacity; // in bytes
private final DatanodeStorage dnStorage; private final DatanodeStorage dnStorage;
private final SimulatedVolume volume;
synchronized long getFree() { synchronized long getFree() {
return capacity - getUsed(); return capacity - getUsed();
@ -433,11 +474,15 @@ synchronized void free(String bpid, long amount) throws IOException {
getBPStorage(bpid).free(amount); getBPStorage(bpid).free(amount);
} }
SimulatedStorage(long cap, DatanodeStorage.State state) { SimulatedStorage(long cap, DatanodeStorage.State state,
FileIoProvider fileIoProvider, Configuration conf) {
capacity = cap; capacity = cap;
dnStorage = new DatanodeStorage( dnStorage = new DatanodeStorage(
"SimulatedStorage-" + DatanodeStorage.generateUuid(), "SimulatedStorage-" + DatanodeStorage.generateUuid(),
state, StorageType.DEFAULT); state, StorageType.DEFAULT);
DataNodeVolumeMetrics volumeMetrics =
DataNodeVolumeMetrics.create(conf, dnStorage.getStorageID());
this.volume = new SimulatedVolume(this, fileIoProvider, volumeMetrics);
} }
synchronized void addBlockPool(String bpid) { synchronized void addBlockPool(String bpid) {
@ -473,6 +518,18 @@ synchronized StorageReport getStorageReport(String bpid) {
false, getCapacity(), getUsed(), getFree(), false, getCapacity(), getUsed(), getFree(),
map.get(bpid).getUsed(), 0L); map.get(bpid).getUsed(), 0L);
} }
SimulatedVolume getVolume() {
return volume;
}
Map<Block, BInfo> getBlockMap(String bpid) throws IOException {
SimulatedBPStorage bpStorage = map.get(bpid);
if (bpStorage == null) {
throw new IOException("Nonexistent block pool: " + bpid);
}
return bpStorage.getBlockMap();
}
} }
static class SimulatedVolume implements FsVolumeSpi { static class SimulatedVolume implements FsVolumeSpi {
@ -601,10 +658,7 @@ public VolumeCheckResult check(VolumeCheckContext context)
} }
} }
private final Map<String, Map<Block, BInfo>> blockMap private final List<SimulatedStorage> storages;
= new ConcurrentHashMap<String, Map<Block,BInfo>>();
private final SimulatedStorage storage;
private final SimulatedVolume volume;
private final String datanodeUuid; private final String datanodeUuid;
private final DataNode datanode; private final DataNode datanode;
@ -615,27 +669,30 @@ public SimulatedFSDataset(DataStorage storage, Configuration conf) {
public SimulatedFSDataset(DataNode datanode, DataStorage storage, Configuration conf) { public SimulatedFSDataset(DataNode datanode, DataStorage storage, Configuration conf) {
this.datanode = datanode; this.datanode = datanode;
if (storage != null) { int storageCount;
if (storage != null && storage.getNumStorageDirs() > 0) {
storageCount = storage.getNumStorageDirs();
for (int i = 0; i < storage.getNumStorageDirs(); ++i) { for (int i = 0; i < storage.getNumStorageDirs(); ++i) {
DataStorage.createStorageID(storage.getStorageDir(i), false, conf); DataStorage.createStorageID(storage.getStorageDir(i), false, conf);
} }
this.datanodeUuid = storage.getDatanodeUuid(); this.datanodeUuid = storage.getDatanodeUuid();
} else { } else {
storageCount = DataNode.getStorageLocations(conf).size();
this.datanodeUuid = "SimulatedDatanode-" + DataNode.generateUuid(); this.datanodeUuid = "SimulatedDatanode-" + DataNode.generateUuid();
} }
registerMBean(datanodeUuid); registerMBean(datanodeUuid);
this.fileIoProvider = new FileIoProvider(conf, datanode); this.fileIoProvider = new FileIoProvider(conf, datanode);
this.storage = new SimulatedStorage(
conf.getLong(CONFIG_PROPERTY_CAPACITY, DEFAULT_CAPACITY),
conf.getEnum(CONFIG_PROPERTY_STATE, DEFAULT_STATE));
// TODO: per volume id or path
DataNodeVolumeMetrics volumeMetrics = DataNodeVolumeMetrics.create(conf,
datanodeUuid);
this.volume = new SimulatedVolume(this.storage, this.fileIoProvider,
volumeMetrics);
this.datasetLock = new AutoCloseableLock(); this.datasetLock = new AutoCloseableLock();
this.storages = new ArrayList<>();
for (int i = 0; i < storageCount; i++) {
this.storages.add(new SimulatedStorage(
conf.getLong(CONFIG_PROPERTY_CAPACITY, DEFAULT_CAPACITY),
conf.getEnum(CONFIG_PROPERTY_STATE, DEFAULT_STATE),
fileIoProvider, conf));
}
} }
public synchronized void injectBlocks(String bpid, public synchronized void injectBlocks(String bpid,
@ -651,33 +708,50 @@ public synchronized void injectBlocks(String bpid,
throw new IOException("Block already exists in block list"); throw new IOException("Block already exists in block list");
} }
} }
Map<Block, BInfo> map = blockMap.get(bpid);
if (map == null) { for (SimulatedStorage storage : storages) {
map = new TreeMap<>(); storage.addBlockPool(bpid);
blockMap.put(bpid, map);
} }
for (Block b: injectBlocks) { for (Block b: injectBlocks) {
BInfo binfo = new BInfo(bpid, b, false); BInfo binfo = new BInfo(bpid, b, false);
map.put(binfo.theBlock, binfo); getBlockMap(b, bpid).put(binfo.theBlock, binfo);
} }
} }
} }
/** Get the storage that a given block lives within. */
private SimulatedStorage getStorage(Block b) {
return storages.get(LongMath.mod(b.getBlockId(), storages.size()));
}
/** Get a map for a given block pool Id */ /**
private Map<Block, BInfo> getMap(String bpid) throws IOException { * Get the block map that a given block lives within, assuming it is within
final Map<Block, BInfo> map = blockMap.get(bpid); * block pool bpid.
if (map == null) { * @param b The block to look for
throw new IOException("Non existent blockpool " + bpid); * @param bpid The block pool that contains b
} * @return The block map (non-null)
return map; * @throws IOException if bpid does not exist
*/
private Map<Block, BInfo> getBlockMap(Block b, String bpid)
throws IOException {
return getStorage(b).getBlockMap(bpid);
}
/**
* Get the block map that a given block lives within.
* @param b The extended block to look for
* @return The block map (non-null)
* @throws IOException if b is in a nonexistent block pool
*/
private Map<Block, BInfo> getBlockMap(ExtendedBlock b) throws IOException {
return getBlockMap(b.getLocalBlock(), b.getBlockPoolId());
} }
@Override // FsDatasetSpi @Override // FsDatasetSpi
public synchronized void finalizeBlock(ExtendedBlock b, boolean fsyncDir) public synchronized void finalizeBlock(ExtendedBlock b, boolean fsyncDir)
throws IOException { throws IOException {
final Map<Block, BInfo> map = getMap(b.getBlockPoolId()); BInfo binfo = getBlockMap(b).get(b.getLocalBlock());
BInfo binfo = map.get(b.getLocalBlock());
if (binfo == null) { if (binfo == null) {
throw new IOException("Finalizing a non existing block " + b); throw new IOException("Finalizing a non existing block " + b);
} }
@ -687,20 +761,21 @@ public synchronized void finalizeBlock(ExtendedBlock b, boolean fsyncDir)
@Override // FsDatasetSpi @Override // FsDatasetSpi
public synchronized void unfinalizeBlock(ExtendedBlock b) throws IOException{ public synchronized void unfinalizeBlock(ExtendedBlock b) throws IOException{
if (isValidRbw(b)) { if (isValidRbw(b)) {
final Map<Block, BInfo> map = getMap(b.getBlockPoolId()); getBlockMap(b).remove(b.getLocalBlock());
map.remove(b.getLocalBlock());
} }
} }
synchronized BlockListAsLongs getBlockReport(String bpid) { synchronized BlockListAsLongs getBlockReport(String bpid,
SimulatedStorage storage) {
BlockListAsLongs.Builder report = BlockListAsLongs.builder(); BlockListAsLongs.Builder report = BlockListAsLongs.builder();
final Map<Block, BInfo> map = blockMap.get(bpid); try {
if (map != null) { for (BInfo b : storage.getBlockMap(bpid).values()) {
for (BInfo b : map.values()) {
if (b.isFinalized()) { if (b.isFinalized()) {
report.add(b); report.add(b);
} }
} }
} catch (IOException ioe) {
DataNode.LOG.error("Exception while getting block reports", ioe);
} }
return report.build(); return report.build();
} }
@ -708,7 +783,11 @@ synchronized BlockListAsLongs getBlockReport(String bpid) {
@Override @Override
public synchronized Map<DatanodeStorage, BlockListAsLongs> getBlockReports( public synchronized Map<DatanodeStorage, BlockListAsLongs> getBlockReports(
String bpid) { String bpid) {
return Collections.singletonMap(storage.getDnStorage(), getBlockReport(bpid)); Map<DatanodeStorage, BlockListAsLongs> blockReports = new HashMap<>();
for (SimulatedStorage storage : storages) {
blockReports.put(storage.getDnStorage(), getBlockReport(bpid, storage));
}
return blockReports;
} }
@Override // FsDatasetSpi @Override // FsDatasetSpi
@ -718,27 +797,49 @@ public List<Long> getCacheReport(String bpid) {
@Override // FSDatasetMBean @Override // FSDatasetMBean
public long getCapacity() { public long getCapacity() {
return storage.getCapacity(); long total = 0;
for (SimulatedStorage storage : storages) {
total += storage.getCapacity();
}
return total;
} }
@Override // FSDatasetMBean @Override // FSDatasetMBean
public long getDfsUsed() { public long getDfsUsed() {
return storage.getUsed(); long total = 0;
for (SimulatedStorage storage : storages) {
total += storage.getUsed();
}
return total;
} }
@Override // FSDatasetMBean @Override // FSDatasetMBean
public long getBlockPoolUsed(String bpid) throws IOException { public long getBlockPoolUsed(String bpid) throws IOException {
return storage.getBlockPoolUsed(bpid); long total = 0;
for (SimulatedStorage storage : storages) {
total += storage.getBlockPoolUsed(bpid);
}
return total;
} }
@Override // FSDatasetMBean @Override // FSDatasetMBean
public long getRemaining() { public long getRemaining() {
return storage.getFree();
long total = 0;
for (SimulatedStorage storage : storages) {
total += storage.getFree();
}
return total;
} }
@Override // FSDatasetMBean @Override // FSDatasetMBean
public int getNumFailedVolumes() { public int getNumFailedVolumes() {
return storage.getNumFailedVolumes();
int total = 0;
for (SimulatedStorage storage : storages) {
total += storage.getNumFailedVolumes();
}
return total;
} }
@Override // FSDatasetMBean @Override // FSDatasetMBean
@ -803,8 +904,7 @@ public void getMetrics(MetricsCollector collector, boolean all) {
@Override // FsDatasetSpi @Override // FsDatasetSpi
public synchronized long getLength(ExtendedBlock b) throws IOException { public synchronized long getLength(ExtendedBlock b) throws IOException {
final Map<Block, BInfo> map = getMap(b.getBlockPoolId()); BInfo binfo = getBlockMap(b).get(b.getLocalBlock());
BInfo binfo = map.get(b.getLocalBlock());
if (binfo == null) { if (binfo == null) {
throw new IOException("Finalizing a non existing block " + b); throw new IOException("Finalizing a non existing block " + b);
} }
@ -814,34 +914,38 @@ public synchronized long getLength(ExtendedBlock b) throws IOException {
@Override @Override
@Deprecated @Deprecated
public Replica getReplica(String bpid, long blockId) { public Replica getReplica(String bpid, long blockId) {
final Map<Block, BInfo> map = blockMap.get(bpid); Block b = new Block(blockId);
if (map != null) { try {
return map.get(new Block(blockId)); return getBlockMap(b, bpid).get(b);
} catch (IOException ioe) {
return null;
} }
return null;
} }
@Override @Override
public synchronized String getReplicaString(String bpid, long blockId) { public synchronized String getReplicaString(String bpid, long blockId) {
Replica r = null; Replica r = null;
final Map<Block, BInfo> map = blockMap.get(bpid); try {
if (map != null) { Block b = new Block(blockId);
r = map.get(new Block(blockId)); r = getBlockMap(b, bpid).get(b);
} catch (IOException ioe) {
// Ignore
} }
return r == null? "null": r.toString(); return r == null? "null": r.toString();
} }
@Override // FsDatasetSpi @Override // FsDatasetSpi
public Block getStoredBlock(String bpid, long blkid) throws IOException { public Block getStoredBlock(String bpid, long blkid) throws IOException {
final Map<Block, BInfo> map = blockMap.get(bpid); Block b = new Block(blkid);
if (map != null) { try {
BInfo binfo = map.get(new Block(blkid)); BInfo binfo = getBlockMap(b, bpid).get(b);
if (binfo == null) { if (binfo == null) {
return null; return null;
} }
return new Block(blkid, binfo.getGenerationStamp(), binfo.getNumBytes()); return new Block(blkid, binfo.getGenerationStamp(), binfo.getNumBytes());
} catch (IOException ioe) {
return null;
} }
return null;
} }
@Override // FsDatasetSpi @Override // FsDatasetSpi
@ -851,18 +955,18 @@ public synchronized void invalidate(String bpid, Block[] invalidBlks)
if (invalidBlks == null) { if (invalidBlks == null) {
return; return;
} }
final Map<Block, BInfo> map = getMap(bpid);
for (Block b: invalidBlks) { for (Block b: invalidBlks) {
if (b == null) { if (b == null) {
continue; continue;
} }
Map<Block, BInfo> map = getBlockMap(b, bpid);
BInfo binfo = map.get(b); BInfo binfo = map.get(b);
if (binfo == null) { if (binfo == null) {
error = true; error = true;
DataNode.LOG.warn("Invalidate: Missing block"); DataNode.LOG.warn("Invalidate: Missing block");
continue; continue;
} }
storage.free(bpid, binfo.getNumBytes()); getStorage(b).free(bpid, binfo.getNumBytes());
map.remove(b); map.remove(b);
if (datanode != null) { if (datanode != null) {
datanode.notifyNamenodeDeletedBlock(new ExtendedBlock(bpid, b), datanode.notifyNamenodeDeletedBlock(new ExtendedBlock(bpid, b),
@ -892,8 +996,11 @@ public boolean isCached(String bpid, long blockId) {
} }
private BInfo getBInfo(final ExtendedBlock b) { private BInfo getBInfo(final ExtendedBlock b) {
final Map<Block, BInfo> map = blockMap.get(b.getBlockPoolId()); try {
return map == null? null: map.get(b.getLocalBlock()); return getBlockMap(b).get(b.getLocalBlock());
} catch (IOException ioe) {
return null;
}
} }
@Override // {@link FsDatasetSpi} @Override // {@link FsDatasetSpi}
@ -957,8 +1064,7 @@ public String toString() {
@Override // FsDatasetSpi @Override // FsDatasetSpi
public synchronized ReplicaHandler append( public synchronized ReplicaHandler append(
ExtendedBlock b, long newGS, long expectedBlockLen) throws IOException { ExtendedBlock b, long newGS, long expectedBlockLen) throws IOException {
final Map<Block, BInfo> map = getMap(b.getBlockPoolId()); BInfo binfo = getBlockMap(b).get(b.getLocalBlock());
BInfo binfo = map.get(b.getLocalBlock());
if (binfo == null || !binfo.isFinalized()) { if (binfo == null || !binfo.isFinalized()) {
throw new ReplicaNotFoundException("Block " + b throw new ReplicaNotFoundException("Block " + b
+ " is not valid, and cannot be appended to."); + " is not valid, and cannot be appended to.");
@ -970,7 +1076,7 @@ public synchronized ReplicaHandler append(
@Override // FsDatasetSpi @Override // FsDatasetSpi
public synchronized ReplicaHandler recoverAppend( public synchronized ReplicaHandler recoverAppend(
ExtendedBlock b, long newGS, long expectedBlockLen) throws IOException { ExtendedBlock b, long newGS, long expectedBlockLen) throws IOException {
final Map<Block, BInfo> map = getMap(b.getBlockPoolId()); final Map<Block, BInfo> map = getBlockMap(b);
BInfo binfo = map.get(b.getLocalBlock()); BInfo binfo = map.get(b.getLocalBlock());
if (binfo == null) { if (binfo == null) {
throw new ReplicaNotFoundException("Block " + b throw new ReplicaNotFoundException("Block " + b
@ -988,7 +1094,7 @@ public synchronized ReplicaHandler recoverAppend(
@Override // FsDatasetSpi @Override // FsDatasetSpi
public Replica recoverClose(ExtendedBlock b, long newGS, long expectedBlockLen) public Replica recoverClose(ExtendedBlock b, long newGS, long expectedBlockLen)
throws IOException { throws IOException {
final Map<Block, BInfo> map = getMap(b.getBlockPoolId()); final Map<Block, BInfo> map = getBlockMap(b);
BInfo binfo = map.get(b.getLocalBlock()); BInfo binfo = map.get(b.getLocalBlock());
if (binfo == null) { if (binfo == null) {
throw new ReplicaNotFoundException("Block " + b throw new ReplicaNotFoundException("Block " + b
@ -1007,7 +1113,7 @@ public Replica recoverClose(ExtendedBlock b, long newGS, long expectedBlockLen)
public synchronized ReplicaHandler recoverRbw( public synchronized ReplicaHandler recoverRbw(
ExtendedBlock b, long newGS, long minBytesRcvd, long maxBytesRcvd) ExtendedBlock b, long newGS, long minBytesRcvd, long maxBytesRcvd)
throws IOException { throws IOException {
final Map<Block, BInfo> map = getMap(b.getBlockPoolId()); final Map<Block, BInfo> map = getBlockMap(b);
BInfo binfo = map.get(b.getLocalBlock()); BInfo binfo = map.get(b.getLocalBlock());
if ( binfo == null) { if ( binfo == null) {
throw new ReplicaNotFoundException("Block " + b throw new ReplicaNotFoundException("Block " + b
@ -1042,16 +1148,14 @@ public synchronized ReplicaHandler createTemporary(StorageType storageType,
throw new ReplicaAlreadyExistsException("Block " + b + throw new ReplicaAlreadyExistsException("Block " + b +
" is being written, and cannot be written to."); " is being written, and cannot be written to.");
} }
final Map<Block, BInfo> map = getMap(b.getBlockPoolId());
BInfo binfo = new BInfo(b.getBlockPoolId(), b.getLocalBlock(), true); BInfo binfo = new BInfo(b.getBlockPoolId(), b.getLocalBlock(), true);
map.put(binfo.theBlock, binfo); getBlockMap(b).put(binfo.theBlock, binfo);
return new ReplicaHandler(binfo, null); return new ReplicaHandler(binfo, null);
} }
protected synchronized InputStream getBlockInputStream(ExtendedBlock b) protected synchronized InputStream getBlockInputStream(ExtendedBlock b)
throws IOException { throws IOException {
final Map<Block, BInfo> map = getMap(b.getBlockPoolId()); BInfo binfo = getBlockMap(b).get(b.getLocalBlock());
BInfo binfo = map.get(b.getLocalBlock());
if (binfo == null) { if (binfo == null) {
throw new IOException("No such Block " + b ); throw new IOException("No such Block " + b );
} }
@ -1077,8 +1181,7 @@ public ReplicaInputStreams getTmpInputStreams(ExtendedBlock b, long blkoff,
@Override // FsDatasetSpi @Override // FsDatasetSpi
public synchronized LengthInputStream getMetaDataInputStream(ExtendedBlock b public synchronized LengthInputStream getMetaDataInputStream(ExtendedBlock b
) throws IOException { ) throws IOException {
final Map<Block, BInfo> map = getMap(b.getBlockPoolId()); BInfo binfo = getBlockMap(b).get(b.getLocalBlock());
BInfo binfo = map.get(b.getLocalBlock());
if (binfo == null) { if (binfo == null) {
throw new IOException("No such Block " + b ); throw new IOException("No such Block " + b );
} }
@ -1266,8 +1369,7 @@ public boolean hasEnoughResource() {
public ReplicaRecoveryInfo initReplicaRecovery(RecoveringBlock rBlock) public ReplicaRecoveryInfo initReplicaRecovery(RecoveringBlock rBlock)
throws IOException { throws IOException {
ExtendedBlock b = rBlock.getBlock(); ExtendedBlock b = rBlock.getBlock();
final Map<Block, BInfo> map = getMap(b.getBlockPoolId()); BInfo binfo = getBlockMap(b).get(b.getLocalBlock());
BInfo binfo = map.get(b.getLocalBlock());
if (binfo == null) { if (binfo == null) {
throw new IOException("No such Block " + b ); throw new IOException("No such Block " + b );
} }
@ -1282,7 +1384,7 @@ public Replica updateReplicaUnderRecovery(ExtendedBlock oldBlock,
long recoveryId, long recoveryId,
long newBlockId, long newBlockId,
long newlength) throws IOException { long newlength) throws IOException {
return getMap(oldBlock.getBlockPoolId()).get(oldBlock.getLocalBlock()); return getBInfo(oldBlock);
} }
@Override // FsDatasetSpi @Override // FsDatasetSpi
@ -1292,15 +1394,16 @@ public long getReplicaVisibleLength(ExtendedBlock block) {
@Override // FsDatasetSpi @Override // FsDatasetSpi
public void addBlockPool(String bpid, Configuration conf) { public void addBlockPool(String bpid, Configuration conf) {
Map<Block, BInfo> map = new TreeMap<>(); for (SimulatedStorage storage : storages) {
blockMap.put(bpid, map); storage.addBlockPool(bpid);
storage.addBlockPool(bpid); }
} }
@Override // FsDatasetSpi @Override // FsDatasetSpi
public void shutdownBlockPool(String bpid) { public void shutdownBlockPool(String bpid) {
blockMap.remove(bpid); for (SimulatedStorage storage : storages) {
storage.removeBlockPool(bpid); storage.removeBlockPool(bpid);
}
} }
@Override // FsDatasetSpi @Override // FsDatasetSpi
@ -1311,11 +1414,7 @@ public void deleteBlockPool(String bpid, boolean force) {
@Override @Override
public ReplicaInPipeline convertTemporaryToRbw(ExtendedBlock temporary) public ReplicaInPipeline convertTemporaryToRbw(ExtendedBlock temporary)
throws IOException { throws IOException {
final Map<Block, BInfo> map = blockMap.get(temporary.getBlockPoolId()); final BInfo r = getBlockMap(temporary).get(temporary.getLocalBlock());
if (map == null) {
throw new IOException("Block pool not found, temporary=" + temporary);
}
final BInfo r = map.get(temporary.getLocalBlock());
if (r == null) { if (r == null) {
throw new IOException("Block not found, temporary=" + temporary); throw new IOException("Block not found, temporary=" + temporary);
} else if (r.isFinalized()) { } else if (r.isFinalized()) {
@ -1359,7 +1458,11 @@ public void checkAndUpdate(String bpid, ScanInfo info) throws IOException {
@Override @Override
public FsVolumeReferences getFsVolumeReferences() { public FsVolumeReferences getFsVolumeReferences() {
return new FsVolumeReferences(Collections.singletonList(volume)); List<SimulatedVolume> volumes = new ArrayList<>();
for (SimulatedStorage storage : storages) {
volumes.add(storage.getVolume());
}
return new FsVolumeReferences(volumes);
} }
@Override @Override
@ -1371,14 +1474,21 @@ public void addVolume(
@Override @Override
public DatanodeStorage getStorage(final String storageUuid) { public DatanodeStorage getStorage(final String storageUuid) {
return storageUuid.equals(storage.getStorageUuid()) ? for (SimulatedStorage storage : storages) {
storage.dnStorage : if (storageUuid.equals(storage.getStorageUuid())) {
null; return storage.getDnStorage();
}
}
return null;
} }
@Override @Override
public StorageReport[] getStorageReports(String bpid) { public StorageReport[] getStorageReports(String bpid) {
return new StorageReport[] {storage.getStorageReport(bpid)}; List<StorageReport> reports = new ArrayList<>();
for (SimulatedStorage storage : storages) {
reports.add(storage.getStorageReport(bpid));
}
return reports.toArray(new StorageReport[0]);
} }
@Override @Override
@ -1393,7 +1503,7 @@ public Map<String, Object> getVolumeInfoMap() {
@Override @Override
public FsVolumeSpi getVolume(ExtendedBlock b) { public FsVolumeSpi getVolume(ExtendedBlock b) {
return volume; return getStorage(b.getLocalBlock()).getVolume();
} }
@Override @Override
@ -1428,12 +1538,12 @@ public ReplicaInfo moveBlockAcrossStorage(ExtendedBlock block,
@Override @Override
public void setPinning(ExtendedBlock b) throws IOException { public void setPinning(ExtendedBlock b) throws IOException {
blockMap.get(b.getBlockPoolId()).get(b.getLocalBlock()).pinned = true; getBlockMap(b).get(b.getLocalBlock()).pinned = true;
} }
@Override @Override
public boolean getPinning(ExtendedBlock b) throws IOException { public boolean getPinning(ExtendedBlock b) throws IOException {
return blockMap.get(b.getBlockPoolId()).get(b.getLocalBlock()).pinned; return getBlockMap(b).get(b.getLocalBlock()).pinned;
} }
@Override @Override

View File

@ -26,20 +26,19 @@
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.OutputStream; import java.io.OutputStream;
import java.util.Map;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.blockmanagement.SequentialBlockIdGenerator;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams; import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetFactory; import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetFactory;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.DataChecksum;
import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
@ -53,6 +52,16 @@ public class TestSimulatedFSDataset {
static final int BLOCK_LENGTH_MULTIPLIER = 79; static final int BLOCK_LENGTH_MULTIPLIER = 79;
static final long FIRST_BLK_ID = 1; static final long FIRST_BLK_ID = 1;
private final int storageCount;
public TestSimulatedFSDataset() {
this(1);
}
protected TestSimulatedFSDataset(int storageCount) {
this.storageCount = storageCount;
}
@Before @Before
public void setUp() throws Exception { public void setUp() throws Exception {
conf = new HdfsConfiguration(); conf = new HdfsConfiguration();
@ -187,43 +196,28 @@ private void testWriteRead(boolean negativeBlkID) throws IOException {
@Test @Test
public void testGetBlockReport() throws IOException { public void testGetBlockReport() throws IOException {
SimulatedFSDataset fsdataset = getSimulatedFSDataset(); final SimulatedFSDataset fsdataset = getSimulatedFSDataset();
BlockListAsLongs blockReport = fsdataset.getBlockReport(bpid); assertBlockReportCountAndSize(fsdataset, 0);
assertEquals(0, blockReport.getNumberOfBlocks());
addSomeBlocks(fsdataset); addSomeBlocks(fsdataset);
blockReport = fsdataset.getBlockReport(bpid); assertBlockReportCountAndSize(fsdataset, NUMBLOCKS);
assertEquals(NUMBLOCKS, blockReport.getNumberOfBlocks()); assertBlockLengthInBlockReports(fsdataset);
for (Block b: blockReport) {
assertNotNull(b);
assertEquals(blockIdToLen(b.getBlockId()), b.getNumBytes());
}
} }
@Test @Test
public void testInjectionEmpty() throws IOException { public void testInjectionEmpty() throws IOException {
SimulatedFSDataset fsdataset = getSimulatedFSDataset(); SimulatedFSDataset fsdataset = getSimulatedFSDataset();
BlockListAsLongs blockReport = fsdataset.getBlockReport(bpid); assertBlockReportCountAndSize(fsdataset, 0);
assertEquals(0, blockReport.getNumberOfBlocks());
int bytesAdded = addSomeBlocks(fsdataset); int bytesAdded = addSomeBlocks(fsdataset);
blockReport = fsdataset.getBlockReport(bpid); assertBlockReportCountAndSize(fsdataset, NUMBLOCKS);
assertEquals(NUMBLOCKS, blockReport.getNumberOfBlocks()); assertBlockLengthInBlockReports(fsdataset);
for (Block b: blockReport) {
assertNotNull(b);
assertEquals(blockIdToLen(b.getBlockId()), b.getNumBytes());
}
// Inject blocks into an empty fsdataset // Inject blocks into an empty fsdataset
// - injecting the blocks we got above. // - injecting the blocks we got above.
SimulatedFSDataset sfsdataset = getSimulatedFSDataset(); SimulatedFSDataset sfsdataset = getSimulatedFSDataset();
sfsdataset.injectBlocks(bpid, blockReport); injectBlocksFromBlockReport(fsdataset, sfsdataset);
blockReport = sfsdataset.getBlockReport(bpid); assertBlockReportCountAndSize(fsdataset, NUMBLOCKS);
assertEquals(NUMBLOCKS, blockReport.getNumberOfBlocks()); assertBlockLengthInBlockReports(fsdataset, sfsdataset);
for (Block b: blockReport) {
assertNotNull(b);
assertEquals(blockIdToLen(b.getBlockId()), b.getNumBytes());
assertEquals(blockIdToLen(b.getBlockId()), sfsdataset
.getLength(new ExtendedBlock(bpid, b)));
}
assertEquals(bytesAdded, sfsdataset.getDfsUsed()); assertEquals(bytesAdded, sfsdataset.getDfsUsed());
assertEquals(sfsdataset.getCapacity()-bytesAdded, sfsdataset.getRemaining()); assertEquals(sfsdataset.getCapacity()-bytesAdded, sfsdataset.getRemaining());
} }
@ -231,16 +225,10 @@ public void testInjectionEmpty() throws IOException {
@Test @Test
public void testInjectionNonEmpty() throws IOException { public void testInjectionNonEmpty() throws IOException {
SimulatedFSDataset fsdataset = getSimulatedFSDataset(); SimulatedFSDataset fsdataset = getSimulatedFSDataset();
BlockListAsLongs blockReport = fsdataset.getBlockReport(bpid); assertBlockReportCountAndSize(fsdataset, 0);
assertEquals(0, blockReport.getNumberOfBlocks());
int bytesAdded = addSomeBlocks(fsdataset); int bytesAdded = addSomeBlocks(fsdataset);
blockReport = fsdataset.getBlockReport(bpid); assertBlockReportCountAndSize(fsdataset, NUMBLOCKS);
assertEquals(NUMBLOCKS, blockReport.getNumberOfBlocks()); assertBlockLengthInBlockReports(fsdataset);
for (Block b: blockReport) {
assertNotNull(b);
assertEquals(blockIdToLen(b.getBlockId()), b.getNumBytes());
}
fsdataset = null;
// Inject blocks into an non-empty fsdataset // Inject blocks into an non-empty fsdataset
// - injecting the blocks we got above. // - injecting the blocks we got above.
@ -248,19 +236,10 @@ public void testInjectionNonEmpty() throws IOException {
// Add come blocks whose block ids do not conflict with // Add come blocks whose block ids do not conflict with
// the ones we are going to inject. // the ones we are going to inject.
bytesAdded += addSomeBlocks(sfsdataset, NUMBLOCKS+1, false); bytesAdded += addSomeBlocks(sfsdataset, NUMBLOCKS+1, false);
sfsdataset.getBlockReport(bpid); assertBlockReportCountAndSize(sfsdataset, NUMBLOCKS);
assertEquals(NUMBLOCKS, blockReport.getNumberOfBlocks()); injectBlocksFromBlockReport(fsdataset, sfsdataset);
sfsdataset.getBlockReport(bpid); assertBlockReportCountAndSize(sfsdataset, NUMBLOCKS * 2);
assertEquals(NUMBLOCKS, blockReport.getNumberOfBlocks()); assertBlockLengthInBlockReports(fsdataset, sfsdataset);
sfsdataset.injectBlocks(bpid, blockReport);
blockReport = sfsdataset.getBlockReport(bpid);
assertEquals(NUMBLOCKS*2, blockReport.getNumberOfBlocks());
for (Block b: blockReport) {
assertNotNull(b);
assertEquals(blockIdToLen(b.getBlockId()), b.getNumBytes());
assertEquals(blockIdToLen(b.getBlockId()), sfsdataset
.getLength(new ExtendedBlock(bpid, b)));
}
assertEquals(bytesAdded, sfsdataset.getDfsUsed()); assertEquals(bytesAdded, sfsdataset.getDfsUsed());
assertEquals(sfsdataset.getCapacity()-bytesAdded, sfsdataset.getRemaining()); assertEquals(sfsdataset.getCapacity()-bytesAdded, sfsdataset.getRemaining());
@ -270,7 +249,7 @@ public void testInjectionNonEmpty() throws IOException {
try { try {
sfsdataset = getSimulatedFSDataset(); sfsdataset = getSimulatedFSDataset();
sfsdataset.addBlockPool(bpid, conf); sfsdataset.addBlockPool(bpid, conf);
sfsdataset.injectBlocks(bpid, blockReport); injectBlocksFromBlockReport(fsdataset, sfsdataset);
assertTrue("Expected an IO exception", false); assertTrue("Expected an IO exception", false);
} catch (IOException e) { } catch (IOException e) {
// ok - as expected // ok - as expected
@ -334,8 +313,68 @@ public void testInvalidate() throws IOException {
assertTrue(fsdataset.isValidBlock(new ExtendedBlock(bpid, b))); assertTrue(fsdataset.isValidBlock(new ExtendedBlock(bpid, b)));
} }
} }
private SimulatedFSDataset getSimulatedFSDataset() { /**
* Inject all of the blocks returned from sourceFSDataset's block reports
* into destinationFSDataset.
*/
private void injectBlocksFromBlockReport(SimulatedFSDataset sourceFSDataset,
SimulatedFSDataset destinationFSDataset) throws IOException {
for (Map.Entry<DatanodeStorage, BlockListAsLongs> ent :
sourceFSDataset.getBlockReports(bpid).entrySet()) {
destinationFSDataset.injectBlocks(bpid, ent.getValue());
}
}
/**
* Assert that the number of block reports returned from fsdataset matches
* {@code storageCount}, and that the total number of blocks is equal to
* expectedBlockCount.
*/
private void assertBlockReportCountAndSize(SimulatedFSDataset fsdataset,
int expectedBlockCount) {
Map<DatanodeStorage, BlockListAsLongs> blockReportMap =
fsdataset.getBlockReports(bpid);
assertEquals(storageCount, blockReportMap.size());
int totalCount = 0;
for (Map.Entry<DatanodeStorage, BlockListAsLongs> ent :
blockReportMap.entrySet()) {
totalCount += ent.getValue().getNumberOfBlocks();
}
assertEquals(expectedBlockCount, totalCount);
}
/**
* Convenience method to call {@link #assertBlockLengthInBlockReports(
* SimulatedFSDataset,SimulatedFSDataset)} with a null second parameter.
*/
private void assertBlockLengthInBlockReports(SimulatedFSDataset fsdataset)
throws IOException {
assertBlockLengthInBlockReports(fsdataset, null);
}
/**
* Assert that, for all of the blocks in the block report(s) returned from
* fsdataset, they are not null and their length matches the expectation.
* If otherFSDataset is non-null, additionally confirm that its idea of the
* length of the block matches as well.
*/
private void assertBlockLengthInBlockReports(SimulatedFSDataset fsdataset,
SimulatedFSDataset otherFSDataset) throws IOException {
for (Map.Entry<DatanodeStorage, BlockListAsLongs> ent :
fsdataset.getBlockReports(bpid).entrySet()) {
for (Block b : ent.getValue()) {
assertNotNull(b);
assertEquals(blockIdToLen(b.getBlockId()), b.getNumBytes());
if (otherFSDataset != null) {
assertEquals(blockIdToLen(b.getBlockId()), otherFSDataset
.getLength(new ExtendedBlock(bpid, b)));
}
}
}
}
protected SimulatedFSDataset getSimulatedFSDataset() {
SimulatedFSDataset fsdataset = new SimulatedFSDataset(null, conf); SimulatedFSDataset fsdataset = new SimulatedFSDataset(null, conf);
fsdataset.addBlockPool(bpid, conf); fsdataset.addBlockPool(bpid, conf);
return fsdataset; return fsdataset;

View File

@ -0,0 +1,50 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.datanode;
import org.junit.Before;
import org.junit.Test;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY;
import static org.junit.Assert.assertEquals;
/**
* Test that the {@link SimulatedFSDataset} works correctly when configured
* with multiple storages.
*/
public class TestSimulatedFSDatasetWithMultipleStorages
extends TestSimulatedFSDataset {
public TestSimulatedFSDatasetWithMultipleStorages() {
super(2);
}
@Before
public void setUp() throws Exception {
super.setUp();
conf.set(DFS_DATANODE_DATA_DIR_KEY, "data1,data2");
}
@Test
public void testMultipleStoragesConfigured() {
SimulatedFSDataset fsDataset = getSimulatedFSDataset();
assertEquals(2, fsDataset.getStorageReports(bpid).length);
}
}