HDFS-12818. Support multiple storages in DataNodeCluster / SimulatedFSDataset. Contributed by Erik Krogen.

(cherry picked from commit 94576b17fb)

Conflicts:
	hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
This commit is contained in:
Erik Krogen 2017-12-18 11:36:22 -08:00 committed by Konstantin V Shvachko
parent 1ef906e29e
commit da4e2f38e1
3 changed files with 350 additions and 150 deletions

View File

@ -22,22 +22,25 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.channels.ClosedChannelException;
import java.util.Collections;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import javax.management.NotCompliantMBeanException;
import javax.management.ObjectName;
import javax.management.StandardMBean;
import com.google.common.math.LongMath;
import org.apache.commons.lang.ArrayUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.server.datanode.checker.VolumeCheckResult;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetImplTestUtils;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
@ -83,6 +86,7 @@ import org.apache.hadoop.util.DataChecksum;
*/
public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
public final static int BYTE_MASK = 0xff;
private final static int DEFAULT_NUM_SIMULATED_DATA_DIRS = 1;
static class Factory extends FsDatasetSpi.Factory<SimulatedFSDataset> {
@Override
public SimulatedFSDataset newInstance(DataNode datanode,
@ -96,9 +100,41 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
}
}
/**
* Used to change the default number of data storages and to mark the
* FSDataset as simulated.
*/
static class TestUtilsFactory
extends FsDatasetTestUtils.Factory<FsDatasetTestUtils> {
@Override
public FsDatasetTestUtils newInstance(DataNode datanode) {
return new FsDatasetImplTestUtils(datanode) {
@Override
public int getDefaultNumOfDataDirs() {
return DEFAULT_NUM_SIMULATED_DATA_DIRS;
}
};
}
@Override
public boolean isSimulated() {
return true;
}
@Override
public int getDefaultNumOfDataDirs() {
return DEFAULT_NUM_SIMULATED_DATA_DIRS;
}
}
public static void setFactory(Configuration conf) {
conf.set(DFSConfigKeys.DFS_DATANODE_FSDATASET_FACTORY_KEY,
Factory.class.getName());
conf.setClass("org.apache.hadoop.hdfs.server.datanode." +
"SimulatedFSDatasetTestUtilsFactory",
TestUtilsFactory.class, FsDatasetTestUtils.Factory.class
);
}
public static byte simulatedByte(Block b, long offsetInBlk) {
@ -145,7 +181,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
if (theBlock.getNumBytes() < 0) {
theBlock.setNumBytes(0);
}
if (!storage.alloc(bpid, theBlock.getNumBytes())) {
if (!getStorage(theBlock).alloc(bpid, theBlock.getNumBytes())) {
// expected length - actual length may
// be more - we find out at finalize
DataNode.LOG.warn("Lack of free storage on a block alloc");
@ -163,7 +199,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
@Override
public String getStorageUuid() {
return storage.getStorageUuid();
return getStorage(theBlock).getStorageUuid();
}
@Override
@ -220,12 +256,12 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
// adjust if necessary
long extraLen = finalSize - theBlock.getNumBytes();
if (extraLen > 0) {
if (!storage.alloc(bpid,extraLen)) {
if (!getStorage(theBlock).alloc(bpid, extraLen)) {
DataNode.LOG.warn("Lack of free storage on a block alloc");
throw new IOException("Creating block, no free space available");
}
} else {
storage.free(bpid, -extraLen);
getStorage(theBlock).free(bpid, -extraLen);
}
theBlock.setNumBytes(finalSize);
@ -264,7 +300,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
} else {
SimulatedOutputStream crcStream = new SimulatedOutputStream();
return new ReplicaOutputStreams(oStream, crcStream, requestedChecksum,
volume, fileIoProvider);
getStorage(theBlock).getVolume(), fileIoProvider);
}
}
@ -339,6 +375,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
*/
private static class SimulatedBPStorage {
private long used; // in bytes
private final Map<Block, BInfo> blockMap = new TreeMap<>();
long getUsed() {
return used;
@ -352,6 +389,10 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
used -= amount;
}
Map<Block, BInfo> getBlockMap() {
return blockMap;
}
SimulatedBPStorage() {
used = 0;
}
@ -363,10 +404,11 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
*/
private static class SimulatedStorage {
private final Map<String, SimulatedBPStorage> map =
new HashMap<String, SimulatedBPStorage>();
new ConcurrentHashMap<>();
private final long capacity; // in bytes
private final DatanodeStorage dnStorage;
private final SimulatedVolume volume;
synchronized long getFree() {
return capacity - getUsed();
@ -404,11 +446,15 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
getBPStorage(bpid).free(amount);
}
SimulatedStorage(long cap, DatanodeStorage.State state) {
SimulatedStorage(long cap, DatanodeStorage.State state,
FileIoProvider fileIoProvider, Configuration conf) {
capacity = cap;
dnStorage = new DatanodeStorage(
"SimulatedStorage-" + DatanodeStorage.generateUuid(),
state, StorageType.DEFAULT);
DataNodeVolumeMetrics volumeMetrics =
DataNodeVolumeMetrics.create(conf, dnStorage.getStorageID());
this.volume = new SimulatedVolume(this, fileIoProvider, volumeMetrics);
}
synchronized void addBlockPool(String bpid) {
@ -444,6 +490,18 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
false, getCapacity(), getUsed(), getFree(),
map.get(bpid).getUsed(), 0L);
}
SimulatedVolume getVolume() {
return volume;
}
Map<Block, BInfo> getBlockMap(String bpid) throws IOException {
SimulatedBPStorage bpStorage = map.get(bpid);
if (bpStorage == null) {
throw new IOException("Nonexistent block pool: " + bpid);
}
return bpStorage.getBlockMap();
}
}
static class SimulatedVolume implements FsVolumeSpi {
@ -565,10 +623,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
}
}
private final Map<String, Map<Block, BInfo>> blockMap
= new ConcurrentHashMap<String, Map<Block,BInfo>>();
private final SimulatedStorage storage;
private final SimulatedVolume volume;
private final List<SimulatedStorage> storages;
private final String datanodeUuid;
private final DataNode datanode;
@ -579,27 +634,30 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
public SimulatedFSDataset(DataNode datanode, DataStorage storage, Configuration conf) {
this.datanode = datanode;
if (storage != null) {
int storageCount;
if (storage != null && storage.getNumStorageDirs() > 0) {
storageCount = storage.getNumStorageDirs();
for (int i = 0; i < storage.getNumStorageDirs(); ++i) {
DataStorage.createStorageID(storage.getStorageDir(i), false);
}
this.datanodeUuid = storage.getDatanodeUuid();
} else {
storageCount = DataNode.getStorageLocations(conf).size();
this.datanodeUuid = "SimulatedDatanode-" + DataNode.generateUuid();
}
registerMBean(datanodeUuid);
this.fileIoProvider = new FileIoProvider(conf, datanode);
this.storage = new SimulatedStorage(
conf.getLong(CONFIG_PROPERTY_CAPACITY, DEFAULT_CAPACITY),
conf.getEnum(CONFIG_PROPERTY_STATE, DEFAULT_STATE));
// TODO: per volume id or path
DataNodeVolumeMetrics volumeMetrics = DataNodeVolumeMetrics.create(conf,
datanodeUuid);
this.volume = new SimulatedVolume(this.storage, this.fileIoProvider,
volumeMetrics);
this.datasetLock = new AutoCloseableLock();
this.storages = new ArrayList<>();
for (int i = 0; i < storageCount; i++) {
this.storages.add(new SimulatedStorage(
conf.getLong(CONFIG_PROPERTY_CAPACITY, DEFAULT_CAPACITY),
conf.getEnum(CONFIG_PROPERTY_STATE, DEFAULT_STATE),
fileIoProvider, conf));
}
}
public synchronized void injectBlocks(String bpid,
@ -615,33 +673,50 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
throw new IOException("Block already exists in block list");
}
}
Map<Block, BInfo> map = blockMap.get(bpid);
if (map == null) {
map = new HashMap<Block, BInfo>();
blockMap.put(bpid, map);
for (SimulatedStorage storage : storages) {
storage.addBlockPool(bpid);
}
for (Block b: injectBlocks) {
BInfo binfo = new BInfo(bpid, b, false);
map.put(binfo.theBlock, binfo);
getBlockMap(b, bpid).put(binfo.theBlock, binfo);
}
}
}
/** Get a map for a given block pool Id */
private Map<Block, BInfo> getMap(String bpid) throws IOException {
final Map<Block, BInfo> map = blockMap.get(bpid);
if (map == null) {
throw new IOException("Non existent blockpool " + bpid);
}
return map;
/** Get the storage that a given block lives within. */
private SimulatedStorage getStorage(Block b) {
return storages.get(LongMath.mod(b.getBlockId(), storages.size()));
}
/**
* Get the block map that a given block lives within, assuming it is within
* block pool bpid.
* @param b The block to look for
* @param bpid The block pool that contains b
* @return The block map (non-null)
* @throws IOException if bpid does not exist
*/
private Map<Block, BInfo> getBlockMap(Block b, String bpid)
throws IOException {
return getStorage(b).getBlockMap(bpid);
}
/**
* Get the block map that a given block lives within.
* @param b The extended block to look for
* @return The block map (non-null)
* @throws IOException if b is in a nonexistent block pool
*/
private Map<Block, BInfo> getBlockMap(ExtendedBlock b) throws IOException {
return getBlockMap(b.getLocalBlock(), b.getBlockPoolId());
}
@Override // FsDatasetSpi
public synchronized void finalizeBlock(ExtendedBlock b, boolean fsyncDir)
throws IOException {
final Map<Block, BInfo> map = getMap(b.getBlockPoolId());
BInfo binfo = map.get(b.getLocalBlock());
BInfo binfo = getBlockMap(b).get(b.getLocalBlock());
if (binfo == null) {
throw new IOException("Finalizing a non existing block " + b);
}
@ -651,20 +726,21 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
@Override // FsDatasetSpi
public synchronized void unfinalizeBlock(ExtendedBlock b) throws IOException{
if (isValidRbw(b)) {
final Map<Block, BInfo> map = getMap(b.getBlockPoolId());
map.remove(b.getLocalBlock());
getBlockMap(b).remove(b.getLocalBlock());
}
}
synchronized BlockListAsLongs getBlockReport(String bpid) {
synchronized BlockListAsLongs getBlockReport(String bpid,
SimulatedStorage storage) {
BlockListAsLongs.Builder report = BlockListAsLongs.builder();
final Map<Block, BInfo> map = blockMap.get(bpid);
if (map != null) {
for (BInfo b : map.values()) {
try {
for (BInfo b : storage.getBlockMap(bpid).values()) {
if (b.isFinalized()) {
report.add(b);
}
}
} catch (IOException ioe) {
DataNode.LOG.error("Exception while getting block reports", ioe);
}
return report.build();
}
@ -672,7 +748,11 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
@Override
public synchronized Map<DatanodeStorage, BlockListAsLongs> getBlockReports(
String bpid) {
return Collections.singletonMap(storage.getDnStorage(), getBlockReport(bpid));
Map<DatanodeStorage, BlockListAsLongs> blockReports = new HashMap<>();
for (SimulatedStorage storage : storages) {
blockReports.put(storage.getDnStorage(), getBlockReport(bpid, storage));
}
return blockReports;
}
@Override // FsDatasetSpi
@ -682,27 +762,49 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
@Override // FSDatasetMBean
public long getCapacity() {
return storage.getCapacity();
long total = 0;
for (SimulatedStorage storage : storages) {
total += storage.getCapacity();
}
return total;
}
@Override // FSDatasetMBean
public long getDfsUsed() {
return storage.getUsed();
long total = 0;
for (SimulatedStorage storage : storages) {
total += storage.getUsed();
}
return total;
}
@Override // FSDatasetMBean
public long getBlockPoolUsed(String bpid) throws IOException {
return storage.getBlockPoolUsed(bpid);
long total = 0;
for (SimulatedStorage storage : storages) {
total += storage.getBlockPoolUsed(bpid);
}
return total;
}
@Override // FSDatasetMBean
public long getRemaining() {
return storage.getFree();
long total = 0;
for (SimulatedStorage storage : storages) {
total += storage.getFree();
}
return total;
}
@Override // FSDatasetMBean
public int getNumFailedVolumes() {
return storage.getNumFailedVolumes();
int total = 0;
for (SimulatedStorage storage : storages) {
total += storage.getNumFailedVolumes();
}
return total;
}
@Override // FSDatasetMBean
@ -767,8 +869,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
@Override // FsDatasetSpi
public synchronized long getLength(ExtendedBlock b) throws IOException {
final Map<Block, BInfo> map = getMap(b.getBlockPoolId());
BInfo binfo = map.get(b.getLocalBlock());
BInfo binfo = getBlockMap(b).get(b.getLocalBlock());
if (binfo == null) {
throw new IOException("Finalizing a non existing block " + b);
}
@ -778,34 +879,38 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
@Override
@Deprecated
public Replica getReplica(String bpid, long blockId) {
final Map<Block, BInfo> map = blockMap.get(bpid);
if (map != null) {
return map.get(new Block(blockId));
Block b = new Block(blockId);
try {
return getBlockMap(b, bpid).get(b);
} catch (IOException ioe) {
return null;
}
return null;
}
@Override
public synchronized String getReplicaString(String bpid, long blockId) {
Replica r = null;
final Map<Block, BInfo> map = blockMap.get(bpid);
if (map != null) {
r = map.get(new Block(blockId));
try {
Block b = new Block(blockId);
r = getBlockMap(b, bpid).get(b);
} catch (IOException ioe) {
// Ignore
}
return r == null? "null": r.toString();
}
@Override // FsDatasetSpi
public Block getStoredBlock(String bpid, long blkid) throws IOException {
final Map<Block, BInfo> map = blockMap.get(bpid);
if (map != null) {
BInfo binfo = map.get(new Block(blkid));
Block b = new Block(blkid);
try {
BInfo binfo = getBlockMap(b, bpid).get(b);
if (binfo == null) {
return null;
}
return new Block(blkid, binfo.getGenerationStamp(), binfo.getNumBytes());
} catch (IOException ioe) {
return null;
}
return null;
}
@Override // FsDatasetSpi
@ -815,18 +920,18 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
if (invalidBlks == null) {
return;
}
final Map<Block, BInfo> map = getMap(bpid);
for (Block b: invalidBlks) {
if (b == null) {
continue;
}
Map<Block, BInfo> map = getBlockMap(b, bpid);
BInfo binfo = map.get(b);
if (binfo == null) {
error = true;
DataNode.LOG.warn("Invalidate: Missing block");
continue;
}
storage.free(bpid, binfo.getNumBytes());
getStorage(b).free(bpid, binfo.getNumBytes());
map.remove(b);
if (datanode != null) {
datanode.notifyNamenodeDeletedBlock(new ExtendedBlock(bpid, b),
@ -856,8 +961,11 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
}
private BInfo getBInfo(final ExtendedBlock b) {
final Map<Block, BInfo> map = blockMap.get(b.getBlockPoolId());
return map == null? null: map.get(b.getLocalBlock());
try {
return getBlockMap(b).get(b.getLocalBlock());
} catch (IOException ioe) {
return null;
}
}
@Override // {@link FsDatasetSpi}
@ -921,8 +1029,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
@Override // FsDatasetSpi
public synchronized ReplicaHandler append(
ExtendedBlock b, long newGS, long expectedBlockLen) throws IOException {
final Map<Block, BInfo> map = getMap(b.getBlockPoolId());
BInfo binfo = map.get(b.getLocalBlock());
BInfo binfo = getBlockMap(b).get(b.getLocalBlock());
if (binfo == null || !binfo.isFinalized()) {
throw new ReplicaNotFoundException("Block " + b
+ " is not valid, and cannot be appended to.");
@ -934,7 +1041,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
@Override // FsDatasetSpi
public synchronized ReplicaHandler recoverAppend(
ExtendedBlock b, long newGS, long expectedBlockLen) throws IOException {
final Map<Block, BInfo> map = getMap(b.getBlockPoolId());
final Map<Block, BInfo> map = getBlockMap(b);
BInfo binfo = map.get(b.getLocalBlock());
if (binfo == null) {
throw new ReplicaNotFoundException("Block " + b
@ -952,7 +1059,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
@Override // FsDatasetSpi
public Replica recoverClose(ExtendedBlock b, long newGS, long expectedBlockLen)
throws IOException {
final Map<Block, BInfo> map = getMap(b.getBlockPoolId());
final Map<Block, BInfo> map = getBlockMap(b);
BInfo binfo = map.get(b.getLocalBlock());
if (binfo == null) {
throw new ReplicaNotFoundException("Block " + b
@ -971,7 +1078,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
public synchronized ReplicaHandler recoverRbw(
ExtendedBlock b, long newGS, long minBytesRcvd, long maxBytesRcvd)
throws IOException {
final Map<Block, BInfo> map = getMap(b.getBlockPoolId());
final Map<Block, BInfo> map = getBlockMap(b);
BInfo binfo = map.get(b.getLocalBlock());
if ( binfo == null) {
throw new ReplicaNotFoundException("Block " + b
@ -1005,16 +1112,14 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
throw new ReplicaAlreadyExistsException("Block " + b +
" is being written, and cannot be written to.");
}
final Map<Block, BInfo> map = getMap(b.getBlockPoolId());
BInfo binfo = new BInfo(b.getBlockPoolId(), b.getLocalBlock(), true);
map.put(binfo.theBlock, binfo);
getBlockMap(b).put(binfo.theBlock, binfo);
return new ReplicaHandler(binfo, null);
}
protected synchronized InputStream getBlockInputStream(ExtendedBlock b)
throws IOException {
final Map<Block, BInfo> map = getMap(b.getBlockPoolId());
BInfo binfo = map.get(b.getLocalBlock());
BInfo binfo = getBlockMap(b).get(b.getLocalBlock());
if (binfo == null) {
throw new IOException("No such Block " + b );
}
@ -1040,8 +1145,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
@Override // FsDatasetSpi
public synchronized LengthInputStream getMetaDataInputStream(ExtendedBlock b
) throws IOException {
final Map<Block, BInfo> map = getMap(b.getBlockPoolId());
BInfo binfo = map.get(b.getLocalBlock());
BInfo binfo = getBlockMap(b).get(b.getLocalBlock());
if (binfo == null) {
throw new IOException("No such Block " + b );
}
@ -1229,8 +1333,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
public ReplicaRecoveryInfo initReplicaRecovery(RecoveringBlock rBlock)
throws IOException {
ExtendedBlock b = rBlock.getBlock();
final Map<Block, BInfo> map = getMap(b.getBlockPoolId());
BInfo binfo = map.get(b.getLocalBlock());
BInfo binfo = getBlockMap(b).get(b.getLocalBlock());
if (binfo == null) {
throw new IOException("No such Block " + b );
}
@ -1245,7 +1348,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
long recoveryId,
long newBlockId,
long newlength) throws IOException {
return getMap(oldBlock.getBlockPoolId()).get(oldBlock.getLocalBlock());
return getBInfo(oldBlock);
}
@Override // FsDatasetSpi
@ -1255,15 +1358,16 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
@Override // FsDatasetSpi
public void addBlockPool(String bpid, Configuration conf) {
Map<Block, BInfo> map = new HashMap<Block, BInfo>();
blockMap.put(bpid, map);
storage.addBlockPool(bpid);
for (SimulatedStorage storage : storages) {
storage.addBlockPool(bpid);
}
}
@Override // FsDatasetSpi
public void shutdownBlockPool(String bpid) {
blockMap.remove(bpid);
storage.removeBlockPool(bpid);
for (SimulatedStorage storage : storages) {
storage.removeBlockPool(bpid);
}
}
@Override // FsDatasetSpi
@ -1274,11 +1378,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
@Override
public ReplicaInPipelineInterface convertTemporaryToRbw(ExtendedBlock temporary)
throws IOException {
final Map<Block, BInfo> map = blockMap.get(temporary.getBlockPoolId());
if (map == null) {
throw new IOException("Block pool not found, temporary=" + temporary);
}
final BInfo r = map.get(temporary.getLocalBlock());
final BInfo r = getBlockMap(temporary).get(temporary.getLocalBlock());
if (r == null) {
throw new IOException("Block not found, temporary=" + temporary);
} else if (r.isFinalized()) {
@ -1329,7 +1429,11 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
@Override
public FsVolumeReferences getFsVolumeReferences() {
return new FsVolumeReferences(Collections.singletonList(volume));
List<SimulatedVolume> volumes = new ArrayList<>();
for (SimulatedStorage storage : storages) {
volumes.add(storage.getVolume());
}
return new FsVolumeReferences(volumes);
}
@Override
@ -1341,14 +1445,21 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
@Override
public DatanodeStorage getStorage(final String storageUuid) {
return storageUuid.equals(storage.getStorageUuid()) ?
storage.dnStorage :
null;
for (SimulatedStorage storage : storages) {
if (storageUuid.equals(storage.getStorageUuid())) {
return storage.getDnStorage();
}
}
return null;
}
@Override
public StorageReport[] getStorageReports(String bpid) {
return new StorageReport[] {storage.getStorageReport(bpid)};
List<StorageReport> reports = new ArrayList<>();
for (SimulatedStorage storage : storages) {
reports.add(storage.getStorageReport(bpid));
}
return reports.toArray(new StorageReport[0]);
}
@Override
@ -1363,7 +1474,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
@Override
public FsVolumeSpi getVolume(ExtendedBlock b) {
return volume;
return getStorage(b.getLocalBlock()).getVolume();
}
@Override
@ -1397,12 +1508,12 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
@Override
public void setPinning(ExtendedBlock b) throws IOException {
blockMap.get(b.getBlockPoolId()).get(b.getLocalBlock()).pinned = true;
getBlockMap(b).get(b.getLocalBlock()).pinned = true;
}
@Override
public boolean getPinning(ExtendedBlock b) throws IOException {
return blockMap.get(b.getBlockPoolId()).get(b.getLocalBlock()).pinned;
return getBlockMap(b).get(b.getLocalBlock()).pinned;
}
@Override

View File

@ -26,20 +26,19 @@ import java.io.DataInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Map;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.blockmanagement.SequentialBlockIdGenerator;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetFactory;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.util.DataChecksum;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@ -53,6 +52,16 @@ public class TestSimulatedFSDataset {
static final int BLOCK_LENGTH_MULTIPLIER = 79;
static final long FIRST_BLK_ID = 1;
private final int storageCount;
public TestSimulatedFSDataset() {
this(1);
}
protected TestSimulatedFSDataset(int storageCount) {
this.storageCount = storageCount;
}
@Before
public void setUp() throws Exception {
conf = new HdfsConfiguration();
@ -187,43 +196,28 @@ public class TestSimulatedFSDataset {
@Test
public void testGetBlockReport() throws IOException {
SimulatedFSDataset fsdataset = getSimulatedFSDataset();
BlockListAsLongs blockReport = fsdataset.getBlockReport(bpid);
assertEquals(0, blockReport.getNumberOfBlocks());
final SimulatedFSDataset fsdataset = getSimulatedFSDataset();
assertBlockReportCountAndSize(fsdataset, 0);
addSomeBlocks(fsdataset);
blockReport = fsdataset.getBlockReport(bpid);
assertEquals(NUMBLOCKS, blockReport.getNumberOfBlocks());
for (Block b: blockReport) {
assertNotNull(b);
assertEquals(blockIdToLen(b.getBlockId()), b.getNumBytes());
}
assertBlockReportCountAndSize(fsdataset, NUMBLOCKS);
assertBlockLengthInBlockReports(fsdataset);
}
@Test
public void testInjectionEmpty() throws IOException {
SimulatedFSDataset fsdataset = getSimulatedFSDataset();
BlockListAsLongs blockReport = fsdataset.getBlockReport(bpid);
assertEquals(0, blockReport.getNumberOfBlocks());
assertBlockReportCountAndSize(fsdataset, 0);
int bytesAdded = addSomeBlocks(fsdataset);
blockReport = fsdataset.getBlockReport(bpid);
assertEquals(NUMBLOCKS, blockReport.getNumberOfBlocks());
for (Block b: blockReport) {
assertNotNull(b);
assertEquals(blockIdToLen(b.getBlockId()), b.getNumBytes());
}
assertBlockReportCountAndSize(fsdataset, NUMBLOCKS);
assertBlockLengthInBlockReports(fsdataset);
// Inject blocks into an empty fsdataset
// - injecting the blocks we got above.
SimulatedFSDataset sfsdataset = getSimulatedFSDataset();
sfsdataset.injectBlocks(bpid, blockReport);
blockReport = sfsdataset.getBlockReport(bpid);
assertEquals(NUMBLOCKS, blockReport.getNumberOfBlocks());
for (Block b: blockReport) {
assertNotNull(b);
assertEquals(blockIdToLen(b.getBlockId()), b.getNumBytes());
assertEquals(blockIdToLen(b.getBlockId()), sfsdataset
.getLength(new ExtendedBlock(bpid, b)));
}
injectBlocksFromBlockReport(fsdataset, sfsdataset);
assertBlockReportCountAndSize(fsdataset, NUMBLOCKS);
assertBlockLengthInBlockReports(fsdataset, sfsdataset);
assertEquals(bytesAdded, sfsdataset.getDfsUsed());
assertEquals(sfsdataset.getCapacity()-bytesAdded, sfsdataset.getRemaining());
}
@ -231,16 +225,10 @@ public class TestSimulatedFSDataset {
@Test
public void testInjectionNonEmpty() throws IOException {
SimulatedFSDataset fsdataset = getSimulatedFSDataset();
BlockListAsLongs blockReport = fsdataset.getBlockReport(bpid);
assertEquals(0, blockReport.getNumberOfBlocks());
assertBlockReportCountAndSize(fsdataset, 0);
int bytesAdded = addSomeBlocks(fsdataset);
blockReport = fsdataset.getBlockReport(bpid);
assertEquals(NUMBLOCKS, blockReport.getNumberOfBlocks());
for (Block b: blockReport) {
assertNotNull(b);
assertEquals(blockIdToLen(b.getBlockId()), b.getNumBytes());
}
fsdataset = null;
assertBlockReportCountAndSize(fsdataset, NUMBLOCKS);
assertBlockLengthInBlockReports(fsdataset);
// Inject blocks into an non-empty fsdataset
// - injecting the blocks we got above.
@ -248,19 +236,10 @@ public class TestSimulatedFSDataset {
// Add come blocks whose block ids do not conflict with
// the ones we are going to inject.
bytesAdded += addSomeBlocks(sfsdataset, NUMBLOCKS+1, false);
sfsdataset.getBlockReport(bpid);
assertEquals(NUMBLOCKS, blockReport.getNumberOfBlocks());
sfsdataset.getBlockReport(bpid);
assertEquals(NUMBLOCKS, blockReport.getNumberOfBlocks());
sfsdataset.injectBlocks(bpid, blockReport);
blockReport = sfsdataset.getBlockReport(bpid);
assertEquals(NUMBLOCKS*2, blockReport.getNumberOfBlocks());
for (Block b: blockReport) {
assertNotNull(b);
assertEquals(blockIdToLen(b.getBlockId()), b.getNumBytes());
assertEquals(blockIdToLen(b.getBlockId()), sfsdataset
.getLength(new ExtendedBlock(bpid, b)));
}
assertBlockReportCountAndSize(sfsdataset, NUMBLOCKS);
injectBlocksFromBlockReport(fsdataset, sfsdataset);
assertBlockReportCountAndSize(sfsdataset, NUMBLOCKS * 2);
assertBlockLengthInBlockReports(fsdataset, sfsdataset);
assertEquals(bytesAdded, sfsdataset.getDfsUsed());
assertEquals(sfsdataset.getCapacity()-bytesAdded, sfsdataset.getRemaining());
@ -270,7 +249,7 @@ public class TestSimulatedFSDataset {
try {
sfsdataset = getSimulatedFSDataset();
sfsdataset.addBlockPool(bpid, conf);
sfsdataset.injectBlocks(bpid, blockReport);
injectBlocksFromBlockReport(fsdataset, sfsdataset);
assertTrue("Expected an IO exception", false);
} catch (IOException e) {
// ok - as expected
@ -334,8 +313,68 @@ public class TestSimulatedFSDataset {
assertTrue(fsdataset.isValidBlock(new ExtendedBlock(bpid, b)));
}
}
private SimulatedFSDataset getSimulatedFSDataset() {
/**
* Inject all of the blocks returned from sourceFSDataset's block reports
* into destinationFSDataset.
*/
private void injectBlocksFromBlockReport(SimulatedFSDataset sourceFSDataset,
SimulatedFSDataset destinationFSDataset) throws IOException {
for (Map.Entry<DatanodeStorage, BlockListAsLongs> ent :
sourceFSDataset.getBlockReports(bpid).entrySet()) {
destinationFSDataset.injectBlocks(bpid, ent.getValue());
}
}
/**
* Assert that the number of block reports returned from fsdataset matches
* {@code storageCount}, and that the total number of blocks is equal to
* expectedBlockCount.
*/
private void assertBlockReportCountAndSize(SimulatedFSDataset fsdataset,
int expectedBlockCount) {
Map<DatanodeStorage, BlockListAsLongs> blockReportMap =
fsdataset.getBlockReports(bpid);
assertEquals(storageCount, blockReportMap.size());
int totalCount = 0;
for (Map.Entry<DatanodeStorage, BlockListAsLongs> ent :
blockReportMap.entrySet()) {
totalCount += ent.getValue().getNumberOfBlocks();
}
assertEquals(expectedBlockCount, totalCount);
}
/**
* Convenience method to call {@link #assertBlockLengthInBlockReports(
* SimulatedFSDataset,SimulatedFSDataset)} with a null second parameter.
*/
private void assertBlockLengthInBlockReports(SimulatedFSDataset fsdataset)
throws IOException {
assertBlockLengthInBlockReports(fsdataset, null);
}
/**
* Assert that, for all of the blocks in the block report(s) returned from
* fsdataset, they are not null and their length matches the expectation.
* If otherFSDataset is non-null, additionally confirm that its idea of the
* length of the block matches as well.
*/
private void assertBlockLengthInBlockReports(SimulatedFSDataset fsdataset,
SimulatedFSDataset otherFSDataset) throws IOException {
for (Map.Entry<DatanodeStorage, BlockListAsLongs> ent :
fsdataset.getBlockReports(bpid).entrySet()) {
for (Block b : ent.getValue()) {
assertNotNull(b);
assertEquals(blockIdToLen(b.getBlockId()), b.getNumBytes());
if (otherFSDataset != null) {
assertEquals(blockIdToLen(b.getBlockId()), otherFSDataset
.getLength(new ExtendedBlock(bpid, b)));
}
}
}
}
protected SimulatedFSDataset getSimulatedFSDataset() {
SimulatedFSDataset fsdataset = new SimulatedFSDataset(null, conf);
fsdataset.addBlockPool(bpid, conf);
return fsdataset;

View File

@ -0,0 +1,50 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.datanode;
import org.junit.Before;
import org.junit.Test;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY;
import static org.junit.Assert.assertEquals;
/**
* Test that the {@link SimulatedFSDataset} works correctly when configured
* with multiple storages.
*/
public class TestSimulatedFSDatasetWithMultipleStorages
extends TestSimulatedFSDataset {
public TestSimulatedFSDatasetWithMultipleStorages() {
super(2);
}
@Before
public void setUp() throws Exception {
super.setUp();
conf.set(DFS_DATANODE_DATA_DIR_KEY, "data1,data2");
}
@Test
public void testMultipleStoragesConfigured() {
SimulatedFSDataset fsDataset = getSimulatedFSDataset();
assertEquals(2, fsDataset.getStorageReports(bpid).length);
}
}