HDFS-3021. Use generic type to declare FSDatasetInterface.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1295929 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Tsz-wo Sze 2012-03-01 21:58:39 +00:00
parent 7ae1523865
commit 9e31bf675d
20 changed files with 144 additions and 126 deletions

View File

@ -162,6 +162,8 @@ Release 0.23.3 - UNRELEASED
HDFS-2899. Service protocol changes in DatanodeProtocol to add multiple HDFS-2899. Service protocol changes in DatanodeProtocol to add multiple
storages. (suresh) storages. (suresh)
HDFS-3021. Use generic type to declare FSDatasetInterface. (szetszwo)
IMPROVEMENTS IMPROVEMENTS
HDFS-2018. Move all journal stream management code into one place. HDFS-2018. Move all journal stream management code into one place.

View File

@ -74,7 +74,7 @@ class BlockPoolSliceScanner {
private long scanPeriod = DEFAULT_SCAN_PERIOD_HOURS * 3600 * 1000; private long scanPeriod = DEFAULT_SCAN_PERIOD_HOURS * 3600 * 1000;
private DataNode datanode; private DataNode datanode;
private final FSDatasetInterface dataset; private final FSDatasetInterface<? extends FSVolumeInterface> dataset;
// sorted set // sorted set
private TreeSet<BlockScanInfo> blockInfoSet; private TreeSet<BlockScanInfo> blockInfoSet;
@ -133,7 +133,8 @@ public int compareTo(BlockScanInfo other) {
} }
} }
BlockPoolSliceScanner(DataNode datanode, FSDatasetInterface dataset, BlockPoolSliceScanner(DataNode datanode,
FSDatasetInterface<? extends FSVolumeInterface> dataset,
Configuration conf, String bpid) { Configuration conf, String bpid) {
this.datanode = datanode; this.datanode = datanode;
this.dataset = dataset; this.dataset = dataset;
@ -216,7 +217,7 @@ void init() throws IOException {
* otherwise, pick the first directory. * otherwise, pick the first directory.
*/ */
File dir = null; File dir = null;
List<FSVolumeInterface> volumes = dataset.getVolumes(); final List<? extends FSVolumeInterface> volumes = dataset.getVolumes();
for (FSVolumeInterface vol : volumes) { for (FSVolumeInterface vol : volumes) {
File bpDir = vol.getDirectory(blockPoolId); File bpDir = vol.getDirectory(blockPoolId);
if (LogFileHandler.isFilePresent(bpDir, verificationLogFile)) { if (LogFileHandler.isFilePresent(bpDir, verificationLogFile)) {

View File

@ -21,7 +21,6 @@
import java.util.List; import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface.FSVolumeInterface; import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface.FSVolumeInterface;
/************************************************** /**************************************************
@ -34,7 +33,7 @@
* *
***************************************************/ ***************************************************/
@InterfaceAudience.Private @InterfaceAudience.Private
public interface BlockVolumeChoosingPolicy { public interface BlockVolumeChoosingPolicy<V extends FSVolumeInterface> {
/** /**
* Returns a specific FSVolume after applying a suitable choice algorithm * Returns a specific FSVolume after applying a suitable choice algorithm
@ -48,7 +47,5 @@ public interface BlockVolumeChoosingPolicy {
* @return the chosen volume to store the block. * @return the chosen volume to store the block.
* @throws IOException when disks are unavailable or are full. * @throws IOException when disks are unavailable or are full.
*/ */
public FSVolumeInterface chooseVolume(List<FSVolumeInterface> volumes, long blockSize) public V chooseVolume(List<V> volumes, long blockSize) throws IOException;
throws IOException;
} }

View File

@ -33,6 +33,7 @@
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface.FSVolumeInterface;
/** /**
* DataBlockScanner manages block scanning for all the block pools. For each * DataBlockScanner manages block scanning for all the block pools. For each
@ -44,7 +45,7 @@
public class DataBlockScanner implements Runnable { public class DataBlockScanner implements Runnable {
public static final Log LOG = LogFactory.getLog(DataBlockScanner.class); public static final Log LOG = LogFactory.getLog(DataBlockScanner.class);
private final DataNode datanode; private final DataNode datanode;
private final FSDatasetInterface dataset; private final FSDatasetInterface<? extends FSVolumeInterface> dataset;
private final Configuration conf; private final Configuration conf;
/** /**
@ -55,7 +56,9 @@ public class DataBlockScanner implements Runnable {
new TreeMap<String, BlockPoolSliceScanner>(); new TreeMap<String, BlockPoolSliceScanner>();
Thread blockScannerThread = null; Thread blockScannerThread = null;
DataBlockScanner(DataNode datanode, FSDatasetInterface dataset, Configuration conf) { DataBlockScanner(DataNode datanode,
FSDatasetInterface<? extends FSVolumeInterface> dataset,
Configuration conf) {
this.datanode = datanode; this.datanode = datanode;
this.dataset = dataset; this.dataset = dataset;
this.conf = conf; this.conf = conf;

View File

@ -123,6 +123,7 @@
import org.apache.hadoop.hdfs.server.common.JspHelper; import org.apache.hadoop.hdfs.server.common.JspHelper;
import org.apache.hadoop.hdfs.server.common.StorageInfo; import org.apache.hadoop.hdfs.server.common.StorageInfo;
import org.apache.hadoop.hdfs.server.common.Util; import org.apache.hadoop.hdfs.server.common.Util;
import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface.FSVolumeInterface;
import org.apache.hadoop.hdfs.server.datanode.SecureDataNodeStarter.SecureResources; import org.apache.hadoop.hdfs.server.datanode.SecureDataNodeStarter.SecureResources;
import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics; import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
import org.apache.hadoop.hdfs.server.datanode.web.resources.DatanodeWebHdfsMethods; import org.apache.hadoop.hdfs.server.datanode.web.resources.DatanodeWebHdfsMethods;
@ -139,7 +140,6 @@
import org.apache.hadoop.http.HttpServer; import org.apache.hadoop.http.HttpServer;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ipc.ProtobufRpcEngine; import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.ProtocolSignature;
import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.ipc.Server; import org.apache.hadoop.ipc.Server;
@ -369,7 +369,7 @@ void refreshNamenodes(Configuration conf)
volatile boolean shouldRun = true; volatile boolean shouldRun = true;
private BlockPoolManager blockPoolManager; private BlockPoolManager blockPoolManager;
public volatile FSDatasetInterface data = null; public volatile FSDatasetInterface<? extends FSVolumeInterface> data = null;
private String clusterId = null; private String clusterId = null;
public final static String EMPTY_DEL_HINT = ""; public final static String EMPTY_DEL_HINT = "";
@ -887,7 +887,7 @@ int getBpOsCount() {
* handshake with the the first namenode is completed. * handshake with the the first namenode is completed.
*/ */
private void initStorage(final NamespaceInfo nsInfo) throws IOException { private void initStorage(final NamespaceInfo nsInfo) throws IOException {
final FSDatasetInterface.Factory factory final FSDatasetInterface.Factory<? extends FSDatasetInterface<?>> factory
= FSDatasetInterface.Factory.getFactory(conf); = FSDatasetInterface.Factory.getFactory(conf);
if (!factory.isSimulated()) { if (!factory.isSimulated()) {
@ -1782,11 +1782,11 @@ public void scheduleAllBlockReport(long delay) {
/** /**
* This method is used for testing. * This method is used for testing.
* Examples are adding and deleting blocks directly. * Examples are adding and deleting blocks directly.
* The most common usage will be when the data node's storage is similated. * The most common usage will be when the data node's storage is simulated.
* *
* @return the fsdataset that stores the blocks * @return the fsdataset that stores the blocks
*/ */
public FSDatasetInterface getFSDataset() { FSDatasetInterface<?> getFSDataset() {
return data; return data;
} }

View File

@ -55,7 +55,7 @@ public class DirectoryScanner implements Runnable {
private static final Log LOG = LogFactory.getLog(DirectoryScanner.class); private static final Log LOG = LogFactory.getLog(DirectoryScanner.class);
private final DataNode datanode; private final DataNode datanode;
private final FSDatasetInterface dataset; private final FSDatasetInterface<?> dataset;
private final ExecutorService reportCompileThreadPool; private final ExecutorService reportCompileThreadPool;
private final ScheduledExecutorService masterThread; private final ScheduledExecutorService masterThread;
private final long scanPeriodMsecs; private final long scanPeriodMsecs;
@ -219,7 +219,7 @@ public long getGenStamp() {
} }
} }
DirectoryScanner(DataNode dn, FSDatasetInterface dataset, Configuration conf) { DirectoryScanner(DataNode dn, FSDatasetInterface<?> dataset, Configuration conf) {
this.datanode = dn; this.datanode = dn;
this.dataset = dataset; this.dataset = dataset;
int interval = conf.getInt(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_KEY, int interval = conf.getInt(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_KEY,
@ -411,7 +411,7 @@ private void addDifference(LinkedList<ScanInfo> diffRecord,
} }
/** Is the given volume still valid in the dataset? */ /** Is the given volume still valid in the dataset? */
private static boolean isValid(final FSDatasetInterface dataset, private static boolean isValid(final FSDatasetInterface<?> dataset,
final FSVolumeInterface volume) { final FSVolumeInterface volume) {
for (FSVolumeInterface vol : dataset.getVolumes()) { for (FSVolumeInterface vol : dataset.getVolumes()) {
if (vol == volume) { if (vol == volume) {
@ -424,7 +424,7 @@ private static boolean isValid(final FSDatasetInterface dataset,
/** Get lists of blocks on the disk sorted by blockId, per blockpool */ /** Get lists of blocks on the disk sorted by blockId, per blockpool */
private Map<String, ScanInfo[]> getDiskReport() { private Map<String, ScanInfo[]> getDiskReport() {
// First get list of data directories // First get list of data directories
final List<FSVolumeInterface> volumes = dataset.getVolumes(); final List<? extends FSVolumeInterface> volumes = dataset.getVolumes();
ArrayList<ScanInfoPerBlockPool> dirReports = ArrayList<ScanInfoPerBlockPool> dirReports =
new ArrayList<ScanInfoPerBlockPool>(volumes.size()); new ArrayList<ScanInfoPerBlockPool>(volumes.size());

View File

@ -57,6 +57,7 @@
import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException; import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
import org.apache.hadoop.hdfs.server.common.GenerationStamp; import org.apache.hadoop.hdfs.server.common.GenerationStamp;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface.FSVolumeInterface;
import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean; import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock; import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo; import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
@ -74,13 +75,13 @@
* *
***************************************************/ ***************************************************/
@InterfaceAudience.Private @InterfaceAudience.Private
class FSDataset implements FSDatasetInterface { class FSDataset implements FSDatasetInterface<FSDataset.FSVolume> {
/** /**
* A factory for creating FSDataset objects. * A factory for creating FSDataset objects.
*/ */
static class Factory extends FSDatasetInterface.Factory { static class Factory extends FSDatasetInterface.Factory<FSDataset> {
@Override @Override
public FSDatasetInterface createFSDatasetInterface(DataNode datanode, public FSDataset createFSDatasetInterface(DataNode datanode,
DataStorage storage, Configuration conf) throws IOException { DataStorage storage, Configuration conf) throws IOException {
return new FSDataset(datanode, storage, conf); return new FSDataset(datanode, storage, conf);
} }
@ -786,13 +787,13 @@ static class FSVolumeSet {
* Read access to this unmodifiable list is not synchronized. * Read access to this unmodifiable list is not synchronized.
* This list is replaced on modification holding "this" lock. * This list is replaced on modification holding "this" lock.
*/ */
private volatile List<FSVolumeInterface> volumes = null; private volatile List<FSVolume> volumes = null;
BlockVolumeChoosingPolicy blockChooser; BlockVolumeChoosingPolicy<FSVolume> blockChooser;
int numFailedVolumes; int numFailedVolumes;
FSVolumeSet(List<FSVolumeInterface> volumes, int failedVols, FSVolumeSet(List<FSVolume> volumes, int failedVols,
BlockVolumeChoosingPolicy blockChooser) { BlockVolumeChoosingPolicy<FSVolume> blockChooser) {
this.volumes = Collections.unmodifiableList(volumes); this.volumes = Collections.unmodifiableList(volumes);
this.blockChooser = blockChooser; this.blockChooser = blockChooser;
this.numFailedVolumes = failedVols; this.numFailedVolumes = failedVols;
@ -810,29 +811,29 @@ private int numberOfFailedVolumes() {
* @return next volume to store the block in. * @return next volume to store the block in.
*/ */
synchronized FSVolume getNextVolume(long blockSize) throws IOException { synchronized FSVolume getNextVolume(long blockSize) throws IOException {
return (FSVolume)blockChooser.chooseVolume(volumes, blockSize); return blockChooser.chooseVolume(volumes, blockSize);
} }
private long getDfsUsed() throws IOException { private long getDfsUsed() throws IOException {
long dfsUsed = 0L; long dfsUsed = 0L;
for (FSVolumeInterface v : volumes) { for (FSVolume v : volumes) {
dfsUsed += ((FSVolume)v).getDfsUsed(); dfsUsed += v.getDfsUsed();
} }
return dfsUsed; return dfsUsed;
} }
private long getBlockPoolUsed(String bpid) throws IOException { private long getBlockPoolUsed(String bpid) throws IOException {
long dfsUsed = 0L; long dfsUsed = 0L;
for (FSVolumeInterface v : volumes) { for (FSVolume v : volumes) {
dfsUsed += ((FSVolume)v).getBlockPoolUsed(bpid); dfsUsed += v.getBlockPoolUsed(bpid);
} }
return dfsUsed; return dfsUsed;
} }
private long getCapacity() { private long getCapacity() {
long capacity = 0L; long capacity = 0L;
for (FSVolumeInterface v : volumes) { for (FSVolume v : volumes) {
capacity += ((FSVolume)v).getCapacity(); capacity += v.getCapacity();
} }
return capacity; return capacity;
} }
@ -845,17 +846,16 @@ private long getRemaining() throws IOException {
return remaining; return remaining;
} }
private void getVolumeMap(ReplicasMap volumeMap) private void getVolumeMap(ReplicasMap volumeMap) throws IOException {
throws IOException { for (FSVolume v : volumes) {
for (FSVolumeInterface v : volumes) { v.getVolumeMap(volumeMap);
((FSVolume)v).getVolumeMap(volumeMap);
} }
} }
private void getVolumeMap(String bpid, ReplicasMap volumeMap) private void getVolumeMap(String bpid, ReplicasMap volumeMap)
throws IOException { throws IOException {
for (FSVolumeInterface v : volumes) { for (FSVolume v : volumes) {
((FSVolume)v).getVolumeMap(bpid, volumeMap); v.getVolumeMap(bpid, volumeMap);
} }
} }
@ -871,10 +871,10 @@ private synchronized List<FSVolume> checkDirs() {
ArrayList<FSVolume> removedVols = null; ArrayList<FSVolume> removedVols = null;
// Make a copy of volumes for performing modification // Make a copy of volumes for performing modification
final List<FSVolumeInterface> volumeList = new ArrayList<FSVolumeInterface>(volumes); final List<FSVolume> volumeList = new ArrayList<FSVolume>(volumes);
for (int idx = 0; idx < volumeList.size(); idx++) { for (int idx = 0; idx < volumeList.size(); idx++) {
FSVolume fsv = (FSVolume)volumeList.get(idx); FSVolume fsv = volumeList.get(idx);
try { try {
fsv.checkDirs(); fsv.checkDirs();
} catch (DiskErrorException e) { } catch (DiskErrorException e) {
@ -891,8 +891,8 @@ private synchronized List<FSVolume> checkDirs() {
// Remove null volumes from the volumes array // Remove null volumes from the volumes array
if (removedVols != null && removedVols.size() > 0) { if (removedVols != null && removedVols.size() > 0) {
List<FSVolumeInterface> newVols = new ArrayList<FSVolumeInterface>(); final List<FSVolume> newVols = new ArrayList<FSVolume>();
for (FSVolumeInterface vol : volumeList) { for (FSVolume vol : volumeList) {
if (vol != null) { if (vol != null) {
newVols.add(vol); newVols.add(vol);
} }
@ -914,21 +914,21 @@ public String toString() {
private void addBlockPool(String bpid, Configuration conf) private void addBlockPool(String bpid, Configuration conf)
throws IOException { throws IOException {
for (FSVolumeInterface v : volumes) { for (FSVolume v : volumes) {
((FSVolume)v).addBlockPool(bpid, conf); v.addBlockPool(bpid, conf);
} }
} }
private void removeBlockPool(String bpid) { private void removeBlockPool(String bpid) {
for (FSVolumeInterface v : volumes) { for (FSVolume v : volumes) {
((FSVolume)v).shutdownBlockPool(bpid); v.shutdownBlockPool(bpid);
} }
} }
private void shutdown() { private void shutdown() {
for (FSVolumeInterface volume : volumes) { for (FSVolume volume : volumes) {
if(volume != null) { if(volume != null) {
((FSVolume)volume).shutdown(); volume.shutdown();
} }
} }
} }
@ -991,7 +991,7 @@ private static long parseGenerationStamp(File blockFile, File metaFile
} }
@Override // FSDatasetInterface @Override // FSDatasetInterface
public List<FSVolumeInterface> getVolumes() { public List<FSVolume> getVolumes() {
return volumes.volumes; return volumes.volumes;
} }
@ -1099,7 +1099,7 @@ private FSDataset(DataNode datanode, DataStorage storage, Configuration conf
+ ", volume failures tolerated: " + volFailuresTolerated); + ", volume failures tolerated: " + volFailuresTolerated);
} }
final List<FSVolumeInterface> volArray = new ArrayList<FSVolumeInterface>( final List<FSVolume> volArray = new ArrayList<FSVolume>(
storage.getNumStorageDirs()); storage.getNumStorageDirs());
for (int idx = 0; idx < storage.getNumStorageDirs(); idx++) { for (int idx = 0; idx < storage.getNumStorageDirs(); idx++) {
final File dir = storage.getStorageDir(idx).getCurrentDir(); final File dir = storage.getStorageDir(idx).getCurrentDir();
@ -1108,12 +1108,12 @@ private FSDataset(DataNode datanode, DataStorage storage, Configuration conf
} }
volumeMap = new ReplicasMap(this); volumeMap = new ReplicasMap(this);
BlockVolumeChoosingPolicy blockChooserImpl = @SuppressWarnings("unchecked")
(BlockVolumeChoosingPolicy) ReflectionUtils.newInstance( final BlockVolumeChoosingPolicy<FSVolume> blockChooserImpl =
conf.getClass(DFSConfigKeys.DFS_DATANODE_BLOCKVOLUMECHOICEPOLICY, ReflectionUtils.newInstance(conf.getClass(
DFSConfigKeys.DFS_DATANODE_BLOCKVOLUMECHOICEPOLICY,
RoundRobinVolumesPolicy.class, RoundRobinVolumesPolicy.class,
BlockVolumeChoosingPolicy.class), BlockVolumeChoosingPolicy.class), conf);
conf);
volumes = new FSVolumeSet(volArray, volsFailed, blockChooserImpl); volumes = new FSVolumeSet(volArray, volsFailed, blockChooserImpl);
volumes.getVolumeMap(volumeMap); volumes.getVolumeMap(volumeMap);
@ -2001,7 +2001,7 @@ public void invalidate(String bpid, Block invalidBlks[]) throws IOException {
boolean error = false; boolean error = false;
for (int i = 0; i < invalidBlks.length; i++) { for (int i = 0; i < invalidBlks.length; i++) {
File f = null; File f = null;
FSVolume v; final FSVolume v;
synchronized (this) { synchronized (this) {
f = getFile(bpid, invalidBlks[i].getBlockId()); f = getFile(bpid, invalidBlks[i].getBlockId());
ReplicaInfo dinfo = volumeMap.get(bpid, invalidBlks[i]); ReplicaInfo dinfo = volumeMap.get(bpid, invalidBlks[i]);
@ -2553,8 +2553,7 @@ private static class VolumeInfo {
private Collection<VolumeInfo> getVolumeInfo() { private Collection<VolumeInfo> getVolumeInfo() {
Collection<VolumeInfo> info = new ArrayList<VolumeInfo>(); Collection<VolumeInfo> info = new ArrayList<VolumeInfo>();
for (FSVolumeInterface v : volumes.volumes) { for (FSVolume volume : volumes.volumes) {
final FSVolume volume = (FSVolume)v;
long used = 0; long used = 0;
long free = 0; long free = 0;
try { try {
@ -2590,8 +2589,8 @@ public Map<String, Object> getVolumeInfoMap() {
public synchronized void deleteBlockPool(String bpid, boolean force) public synchronized void deleteBlockPool(String bpid, boolean force)
throws IOException { throws IOException {
if (!force) { if (!force) {
for (FSVolumeInterface volume : volumes.volumes) { for (FSVolume volume : volumes.volumes) {
if (!((FSVolume)volume).isBPDirEmpty(bpid)) { if (!volume.isBPDirEmpty(bpid)) {
DataNode.LOG.warn(bpid DataNode.LOG.warn(bpid
+ " has some block files, cannot delete unless forced"); + " has some block files, cannot delete unless forced");
throw new IOException("Cannot delete block pool, " throw new IOException("Cannot delete block pool, "
@ -2599,8 +2598,8 @@ public synchronized void deleteBlockPool(String bpid, boolean force)
} }
} }
} }
for (FSVolumeInterface volume : volumes.volumes) { for (FSVolume volume : volumes.volumes) {
((FSVolume)volume).deleteBPDirectories(bpid, force); volume.deleteBPDirectories(bpid, force);
} }
} }

View File

@ -50,13 +50,15 @@
* *
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
public interface FSDatasetInterface extends FSDatasetMBean { public interface FSDatasetInterface<V extends FSDatasetInterface.FSVolumeInterface>
extends FSDatasetMBean {
/** /**
* A factory for creating FSDatasetInterface objects. * A factory for creating FSDatasetInterface objects.
*/ */
public abstract class Factory { public abstract class Factory<D extends FSDatasetInterface<?>> {
/** @return the configured factory. */ /** @return the configured factory. */
public static Factory getFactory(Configuration conf) { public static Factory<?> getFactory(Configuration conf) {
@SuppressWarnings("rawtypes")
final Class<? extends Factory> clazz = conf.getClass( final Class<? extends Factory> clazz = conf.getClass(
DFSConfigKeys.DFS_DATANODE_FSDATASET_FACTORY_KEY, DFSConfigKeys.DFS_DATANODE_FSDATASET_FACTORY_KEY,
FSDataset.Factory.class, FSDataset.Factory.class,
@ -65,7 +67,7 @@ public static Factory getFactory(Configuration conf) {
} }
/** Create a FSDatasetInterface object. */ /** Create a FSDatasetInterface object. */
public abstract FSDatasetInterface createFSDatasetInterface( public abstract D createFSDatasetInterface(
DataNode datanode, DataStorage storage, Configuration conf DataNode datanode, DataStorage storage, Configuration conf
) throws IOException; ) throws IOException;
@ -94,7 +96,7 @@ interface FSVolumeInterface {
} }
/** @return a list of volumes. */ /** @return a list of volumes. */
public List<FSVolumeInterface> getVolumes(); public List<V> getVolumes();
/** @return a volume information map (name => info). */ /** @return a volume information map (name => info). */
public Map<String, Object> getVolumeInfoMap(); public Map<String, Object> getVolumeInfoMap();
@ -234,7 +236,7 @@ static class BlockWriteStreams {
this.checksum = checksum; this.checksum = checksum;
} }
void close() throws IOException { void close() {
IOUtils.closeStream(dataOut); IOUtils.closeStream(dataOut);
IOUtils.closeStream(checksumOut); IOUtils.closeStream(checksumOut);
} }

View File

@ -23,13 +23,14 @@
import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface.FSVolumeInterface; import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface.FSVolumeInterface;
import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException; import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
public class RoundRobinVolumesPolicy implements BlockVolumeChoosingPolicy { public class RoundRobinVolumesPolicy<V extends FSVolumeInterface>
implements BlockVolumeChoosingPolicy<V> {
private int curVolume = 0; private int curVolume = 0;
@Override @Override
public synchronized FSVolumeInterface chooseVolume( public synchronized V chooseVolume(final List<V> volumes, final long blockSize
List<FSVolumeInterface> volumes, long blockSize) throws IOException { ) throws IOException {
if(volumes.size() < 1) { if(volumes.size() < 1) {
throw new DiskOutOfSpaceException("No more available volumes"); throw new DiskOutOfSpaceException("No more available volumes");
} }
@ -44,7 +45,7 @@ public synchronized FSVolumeInterface chooseVolume(
long maxAvailable = 0; long maxAvailable = 0;
while (true) { while (true) {
FSVolumeInterface volume = volumes.get(curVolume); final V volume = volumes.get(curVolume);
curVolume = (curVolume + 1) % volumes.size(); curVolume = (curVolume + 1) % volumes.size();
long availableVolumeSize = volume.getAvailable(); long availableVolumeSize = volume.getAvailable();
if (availableVolumeSize > blockSize) { return volume; } if (availableVolumeSize > blockSize) { return volume; }

View File

@ -1565,8 +1565,8 @@ public Iterable<Block> getBlockReport(String bpid, int dataNodeIndex) {
if (dataNodeIndex < 0 || dataNodeIndex > dataNodes.size()) { if (dataNodeIndex < 0 || dataNodeIndex > dataNodes.size()) {
throw new IndexOutOfBoundsException(); throw new IndexOutOfBoundsException();
} }
return dataNodes.get(dataNodeIndex).datanode.getFSDataset().getBlockReport( final DataNode dn = dataNodes.get(dataNodeIndex).datanode;
bpid); return DataNodeTestUtils.getFSDataset(dn).getBlockReport(bpid);
} }
@ -1598,7 +1598,8 @@ public void injectBlocks(int dataNodeIndex, Iterable<Block> blocksToInject) thro
if (dataNodeIndex < 0 || dataNodeIndex > dataNodes.size()) { if (dataNodeIndex < 0 || dataNodeIndex > dataNodes.size()) {
throw new IndexOutOfBoundsException(); throw new IndexOutOfBoundsException();
} }
FSDatasetInterface dataSet = dataNodes.get(dataNodeIndex).datanode.getFSDataset(); final DataNode dn = dataNodes.get(dataNodeIndex).datanode;
final FSDatasetInterface<?> dataSet = DataNodeTestUtils.getFSDataset(dn);
if (!(dataSet instanceof SimulatedFSDataset)) { if (!(dataSet instanceof SimulatedFSDataset)) {
throw new IOException("injectBlocks is valid only for SimilatedFSDataset"); throw new IOException("injectBlocks is valid only for SimilatedFSDataset");
} }
@ -1616,7 +1617,8 @@ public void injectBlocks(int nameNodeIndex, int dataNodeIndex,
if (dataNodeIndex < 0 || dataNodeIndex > dataNodes.size()) { if (dataNodeIndex < 0 || dataNodeIndex > dataNodes.size()) {
throw new IndexOutOfBoundsException(); throw new IndexOutOfBoundsException();
} }
FSDatasetInterface dataSet = dataNodes.get(dataNodeIndex).datanode.getFSDataset(); final DataNode dn = dataNodes.get(dataNodeIndex).datanode;
final FSDatasetInterface<?> dataSet = DataNodeTestUtils.getFSDataset(dn);
if (!(dataSet instanceof SimulatedFSDataset)) { if (!(dataSet instanceof SimulatedFSDataset)) {
throw new IOException("injectBlocks is valid only for SimilatedFSDataset"); throw new IOException("injectBlocks is valid only for SimilatedFSDataset");
} }

View File

@ -25,6 +25,7 @@
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
public class TestDFSRemove extends junit.framework.TestCase { public class TestDFSRemove extends junit.framework.TestCase {
final Path dir = new Path("/test/remove/"); final Path dir = new Path("/test/remove/");
@ -45,7 +46,7 @@ static void createFile(FileSystem fs, Path f) throws IOException {
static long getTotalDfsUsed(MiniDFSCluster cluster) throws IOException { static long getTotalDfsUsed(MiniDFSCluster cluster) throws IOException {
long total = 0; long total = 0;
for(DataNode node : cluster.getDataNodes()) { for(DataNode node : cluster.getDataNodes()) {
total += node.getFSDataset().getDfsUsed(); total += DataNodeTestUtils.getFSDataset(node).getDfsUsed();
} }
return total; return total;
} }

View File

@ -60,6 +60,7 @@
import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface;
import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset; import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.LeaseManager; import org.apache.hadoop.hdfs.server.namenode.LeaseManager;
@ -210,8 +211,10 @@ public void testFileCreation() throws IOException {
// can't check capacities for real storage since the OS file system may be changing under us. // can't check capacities for real storage since the OS file system may be changing under us.
if (simulatedStorage) { if (simulatedStorage) {
DataNode dn = cluster.getDataNodes().get(0); DataNode dn = cluster.getDataNodes().get(0);
assertEquals(fileSize, dn.getFSDataset().getDfsUsed()); FSDatasetInterface<?> dataset = DataNodeTestUtils.getFSDataset(dn);
assertEquals(SimulatedFSDataset.DEFAULT_CAPACITY-fileSize, dn.getFSDataset().getRemaining()); assertEquals(fileSize, dataset.getDfsUsed());
assertEquals(SimulatedFSDataset.DEFAULT_CAPACITY-fileSize,
dataset.getRemaining());
} }
} finally { } finally {
cluster.shutdown(); cluster.shutdown();

View File

@ -41,6 +41,17 @@ public class DataNodeTestUtils {
return dn.getDNRegistrationForBP(bpid); return dn.getDNRegistrationForBP(bpid);
} }
/**
* This method is used for testing.
* Examples are adding and deleting blocks directly.
* The most common usage will be when the data node's storage is simulated.
*
* @return the fsdataset that stores the blocks
*/
public static FSDatasetInterface<?> getFSDataset(DataNode dn) {
return dn.getFSDataset();
}
public static File getFile(DataNode dn, String bpid, long bid) { public static File getFile(DataNode dn, String bpid, long bid) {
return ((FSDataset)dn.getFSDataset()).getFile(bpid, bid); return ((FSDataset)dn.getFSDataset()).getFile(bpid, bid);
} }

View File

@ -61,10 +61,11 @@
* *
* Note the synchronization is coarse grained - it is at each method. * Note the synchronization is coarse grained - it is at each method.
*/ */
public class SimulatedFSDataset implements FSDatasetInterface { public class SimulatedFSDataset
static class Factory extends FSDatasetInterface.Factory { implements FSDatasetInterface<FSDatasetInterface.FSVolumeInterface> {
static class Factory extends FSDatasetInterface.Factory<SimulatedFSDataset> {
@Override @Override
public FSDatasetInterface createFSDatasetInterface(DataNode datanode, public SimulatedFSDataset createFSDatasetInterface(DataNode datanode,
DataStorage storage, Configuration conf) throws IOException { DataStorage storage, Configuration conf) throws IOException {
return new SimulatedFSDataset(datanode, storage, conf); return new SimulatedFSDataset(datanode, storage, conf);
} }

View File

@ -210,13 +210,14 @@ public void blockReport_02() throws IOException {
LOG.debug("Number of blocks allocated " + lBlocks.size()); LOG.debug("Number of blocks allocated " + lBlocks.size());
} }
final DataNode dn0 = cluster.getDataNodes().get(DN_N0);
for (ExtendedBlock b : blocks2Remove) { for (ExtendedBlock b : blocks2Remove) {
if(LOG.isDebugEnabled()) { if(LOG.isDebugEnabled()) {
LOG.debug("Removing the block " + b.getBlockName()); LOG.debug("Removing the block " + b.getBlockName());
} }
for (File f : findAllFiles(dataDir, for (File f : findAllFiles(dataDir,
new MyFileFilter(b.getBlockName(), true))) { new MyFileFilter(b.getBlockName(), true))) {
cluster.getDataNodes().get(DN_N0).getFSDataset().unfinalizeBlock(b); DataNodeTestUtils.getFSDataset(dn0).unfinalizeBlock(b);
if (!f.delete()) if (!f.delete())
LOG.warn("Couldn't delete " + b.getBlockName()); LOG.warn("Couldn't delete " + b.getBlockName());
} }
@ -225,9 +226,8 @@ public void blockReport_02() throws IOException {
waitTil(DN_RESCAN_EXTRA_WAIT); waitTil(DN_RESCAN_EXTRA_WAIT);
// all blocks belong to the same file, hence same BP // all blocks belong to the same file, hence same BP
DataNode dn = cluster.getDataNodes().get(DN_N0);
String poolId = cluster.getNamesystem().getBlockPoolId(); String poolId = cluster.getNamesystem().getBlockPoolId();
DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId); DatanodeRegistration dnR = dn0.getDNRegistrationForBP(poolId);
StorageBlockReport[] report = { new StorageBlockReport(dnR.getStorageID(), StorageBlockReport[] report = { new StorageBlockReport(dnR.getStorageID(),
new BlockListAsLongs(blocks, null).getBlockListAsLongs()) }; new BlockListAsLongs(blocks, null).getBlockListAsLongs()) };
cluster.getNameNodeRpc().blockReport(dnR, poolId, report); cluster.getNameNodeRpc().blockReport(dnR, poolId, report);
@ -602,15 +602,15 @@ private void waitForTempReplica(Block bl, int DN_N1) throws IOException {
cluster.waitActive(); cluster.waitActive();
// Look about specified DN for the replica of the block from 1st DN // Look about specified DN for the replica of the block from 1st DN
final DataNode dn1 = cluster.getDataNodes().get(DN_N1);
final FSDataset dataset1 = (FSDataset)DataNodeTestUtils.getFSDataset(dn1);
String bpid = cluster.getNamesystem().getBlockPoolId(); String bpid = cluster.getNamesystem().getBlockPoolId();
Replica r = ((FSDataset) cluster.getDataNodes().get(DN_N1).getFSDataset()). Replica r = dataset1.fetchReplicaInfo(bpid, bl.getBlockId());
fetchReplicaInfo(bpid, bl.getBlockId());
long start = System.currentTimeMillis(); long start = System.currentTimeMillis();
int count = 0; int count = 0;
while (r == null) { while (r == null) {
waitTil(5); waitTil(5);
r = ((FSDataset) cluster.getDataNodes().get(DN_N1).getFSDataset()). r = dataset1.fetchReplicaInfo(bpid, bl.getBlockId());
fetchReplicaInfo(bpid, bl.getBlockId());
long waiting_period = System.currentTimeMillis() - start; long waiting_period = System.currentTimeMillis() - start;
if (count++ % 100 == 0) if (count++ % 100 == 0)
if(LOG.isDebugEnabled()) { if(LOG.isDebugEnabled()) {

View File

@ -145,8 +145,11 @@ public void testVolumeFailure() throws IOException {
DataNode dn = cluster.getDataNodes().get(1); //corresponds to dir data3 DataNode dn = cluster.getDataNodes().get(1); //corresponds to dir data3
String bpid = cluster.getNamesystem().getBlockPoolId(); String bpid = cluster.getNamesystem().getBlockPoolId();
DatanodeRegistration dnR = dn.getDNRegistrationForBP(bpid); DatanodeRegistration dnR = dn.getDNRegistrationForBP(bpid);
StorageBlockReport[] report = { new StorageBlockReport(dnR.getStorageID(), final StorageBlockReport[] report = {
dn.getFSDataset().getBlockReport(bpid).getBlockListAsLongs()) }; new StorageBlockReport(dnR.getStorageID(),
DataNodeTestUtils.getFSDataset(dn).getBlockReport(bpid
).getBlockListAsLongs())
};
cluster.getNameNodeRpc().blockReport(dnR, bpid, report); cluster.getNameNodeRpc().blockReport(dnR, bpid, report);
// verify number of blocks and files... // verify number of blocks and files...

View File

@ -24,11 +24,7 @@
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.util.concurrent.TimeoutException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.FileUtil;
@ -38,7 +34,6 @@
import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
import org.apache.log4j.Level;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
@ -47,12 +42,6 @@
* Test the ability of a DN to tolerate volume failures. * Test the ability of a DN to tolerate volume failures.
*/ */
public class TestDataNodeVolumeFailureToleration { public class TestDataNodeVolumeFailureToleration {
private static final Log LOG = LogFactory.getLog(TestDataNodeVolumeFailureToleration.class);
{
((Log4JLogger)TestDataNodeVolumeFailureToleration.LOG).getLogger().setLevel(Level.ALL);
}
private FileSystem fs; private FileSystem fs;
private MiniDFSCluster cluster; private MiniDFSCluster cluster;
private Configuration conf; private Configuration conf;
@ -130,7 +119,7 @@ public void testValidVolumesAtStartup() throws Exception {
assertTrue("The DN should have started up fine.", assertTrue("The DN should have started up fine.",
cluster.isDataNodeUp()); cluster.isDataNodeUp());
DataNode dn = cluster.getDataNodes().get(0); DataNode dn = cluster.getDataNodes().get(0);
String si = dn.getFSDataset().getStorageInfo(); String si = DataNodeTestUtils.getFSDataset(dn).getStorageInfo();
assertTrue("The DN should have started with this directory", assertTrue("The DN should have started with this directory",
si.contains(dataDir1Actual.getPath())); si.contains(dataDir1Actual.getPath()));
assertFalse("The DN shouldn't have a bad directory.", assertFalse("The DN shouldn't have a bad directory.",
@ -227,7 +216,7 @@ public void testVolumeAndTolerableConfiguration() throws Exception {
*/ */
private void testVolumeConfig(int volumesTolerated, int volumesFailed, private void testVolumeConfig(int volumesTolerated, int volumesFailed,
boolean expectedBPServiceState, boolean manageDfsDirs) boolean expectedBPServiceState, boolean manageDfsDirs)
throws IOException, InterruptedException, TimeoutException { throws IOException, InterruptedException {
assumeTrue(!System.getProperty("os.name").startsWith("Windows")); assumeTrue(!System.getProperty("os.name").startsWith("Windows"));
final int dnIndex = 0; final int dnIndex = 0;
// Fail the current directory since invalid storage directory perms // Fail the current directory since invalid storage directory perms

View File

@ -38,7 +38,7 @@
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.server.common.GenerationStamp; import org.apache.hadoop.hdfs.server.common.GenerationStamp;
import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface.FSVolumeInterface; import org.apache.hadoop.hdfs.server.datanode.FSDataset.FSVolume;
/** /**
* Tests {@link DirectoryScanner} handling of differences * Tests {@link DirectoryScanner} handling of differences
@ -142,7 +142,7 @@ private String getMetaFile(long id) {
/** Create a block file in a random volume*/ /** Create a block file in a random volume*/
private long createBlockFile() throws IOException { private long createBlockFile() throws IOException {
List<FSVolumeInterface> volumes = fds.getVolumes(); List<FSVolume> volumes = fds.getVolumes();
int index = rand.nextInt(volumes.size() - 1); int index = rand.nextInt(volumes.size() - 1);
long id = getFreeBlockId(); long id = getFreeBlockId();
File finalizedDir = volumes.get(index).getFinalizedDir(bpid); File finalizedDir = volumes.get(index).getFinalizedDir(bpid);
@ -155,7 +155,7 @@ private long createBlockFile() throws IOException {
/** Create a metafile in a random volume*/ /** Create a metafile in a random volume*/
private long createMetaFile() throws IOException { private long createMetaFile() throws IOException {
List<FSVolumeInterface> volumes = fds.getVolumes(); List<FSVolume> volumes = fds.getVolumes();
int index = rand.nextInt(volumes.size() - 1); int index = rand.nextInt(volumes.size() - 1);
long id = getFreeBlockId(); long id = getFreeBlockId();
File finalizedDir = volumes.get(index).getFinalizedDir(bpid); File finalizedDir = volumes.get(index).getFinalizedDir(bpid);
@ -168,7 +168,7 @@ private long createMetaFile() throws IOException {
/** Create block file and corresponding metafile in a rondom volume */ /** Create block file and corresponding metafile in a rondom volume */
private long createBlockMetaFile() throws IOException { private long createBlockMetaFile() throws IOException {
List<FSVolumeInterface> volumes = fds.getVolumes(); List<FSVolume> volumes = fds.getVolumes();
int index = rand.nextInt(volumes.size() - 1); int index = rand.nextInt(volumes.size() - 1);
long id = getFreeBlockId(); long id = getFreeBlockId();
File finalizedDir = volumes.get(index).getFinalizedDir(bpid); File finalizedDir = volumes.get(index).getFinalizedDir(bpid);
@ -228,7 +228,8 @@ public void runTest(int parallelism) throws Exception {
try { try {
cluster.waitActive(); cluster.waitActive();
bpid = cluster.getNamesystem().getBlockPoolId(); bpid = cluster.getNamesystem().getBlockPoolId();
fds = (FSDataset) cluster.getDataNodes().get(0).getFSDataset(); fds = (FSDataset)DataNodeTestUtils.getFSDataset(
cluster.getDataNodes().get(0));
CONF.setInt(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THREADS_KEY, CONF.setInt(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THREADS_KEY,
parallelism); parallelism);
DataNode dn = cluster.getDataNodes().get(0); DataNode dn = cluster.getDataNodes().get(0);

View File

@ -43,8 +43,10 @@ public void testRR() throws Exception {
volumes.add(Mockito.mock(FSVolumeInterface.class)); volumes.add(Mockito.mock(FSVolumeInterface.class));
Mockito.when(volumes.get(1).getAvailable()).thenReturn(200L); Mockito.when(volumes.get(1).getAvailable()).thenReturn(200L);
RoundRobinVolumesPolicy policy = ReflectionUtils.newInstance( @SuppressWarnings("unchecked")
RoundRobinVolumesPolicy.class, null); final RoundRobinVolumesPolicy<FSVolumeInterface> policy =
(RoundRobinVolumesPolicy<FSVolumeInterface>)ReflectionUtils.newInstance(
RoundRobinVolumesPolicy.class, null);
// Test two rounds of round-robin choosing // Test two rounds of round-robin choosing
Assert.assertEquals(volumes.get(0), policy.chooseVolume(volumes, 0)); Assert.assertEquals(volumes.get(0), policy.chooseVolume(volumes, 0));
@ -79,7 +81,8 @@ public void testRRPolicyExceptionMessage()
volumes.add(Mockito.mock(FSVolumeInterface.class)); volumes.add(Mockito.mock(FSVolumeInterface.class));
Mockito.when(volumes.get(1).getAvailable()).thenReturn(600L); Mockito.when(volumes.get(1).getAvailable()).thenReturn(600L);
RoundRobinVolumesPolicy policy = new RoundRobinVolumesPolicy(); final RoundRobinVolumesPolicy<FSVolumeInterface> policy
= new RoundRobinVolumesPolicy<FSVolumeInterface>();
int blockSize = 700; int blockSize = 700;
try { try {
policy.chooseVolume(volumes, blockSize); policy.chooseVolume(volumes, blockSize);

View File

@ -21,6 +21,7 @@
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.OutputStream; import java.io.OutputStream;
import junit.framework.TestCase; import junit.framework.TestCase;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
@ -28,8 +29,6 @@
import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface;
import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface.BlockWriteStreams; import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface.BlockWriteStreams;
import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.DataChecksum;
@ -56,7 +55,7 @@ long blockIdToLen(long blkid) {
return blkid*BLOCK_LENGTH_MULTIPLIER; return blkid*BLOCK_LENGTH_MULTIPLIER;
} }
int addSomeBlocks(FSDatasetInterface fsdataset, int startingBlockId) int addSomeBlocks(SimulatedFSDataset fsdataset, int startingBlockId)
throws IOException { throws IOException {
int bytesAdded = 0; int bytesAdded = 0;
for (int i = startingBlockId; i < startingBlockId+NUMBLOCKS; ++i) { for (int i = startingBlockId; i < startingBlockId+NUMBLOCKS; ++i) {
@ -83,24 +82,24 @@ int addSomeBlocks(FSDatasetInterface fsdataset, int startingBlockId)
} }
return bytesAdded; return bytesAdded;
} }
int addSomeBlocks(FSDatasetInterface fsdataset ) throws IOException { int addSomeBlocks(SimulatedFSDataset fsdataset ) throws IOException {
return addSomeBlocks(fsdataset, 1); return addSomeBlocks(fsdataset, 1);
} }
public void testFSDatasetFactory() { public void testFSDatasetFactory() {
final Configuration conf = new Configuration(); final Configuration conf = new Configuration();
FSDatasetInterface.Factory f = FSDatasetInterface.Factory.getFactory(conf); FSDatasetInterface.Factory<?> f = FSDatasetInterface.Factory.getFactory(conf);
assertEquals(FSDataset.Factory.class, f.getClass()); assertEquals(FSDataset.Factory.class, f.getClass());
assertFalse(f.isSimulated()); assertFalse(f.isSimulated());
SimulatedFSDataset.setFactory(conf); SimulatedFSDataset.setFactory(conf);
FSDatasetInterface.Factory s = FSDatasetInterface.Factory.getFactory(conf); FSDatasetInterface.Factory<?> s = FSDatasetInterface.Factory.getFactory(conf);
assertEquals(SimulatedFSDataset.Factory.class, s.getClass()); assertEquals(SimulatedFSDataset.Factory.class, s.getClass());
assertTrue(s.isSimulated()); assertTrue(s.isSimulated());
} }
public void testGetMetaData() throws IOException { public void testGetMetaData() throws IOException {
FSDatasetInterface fsdataset = getSimulatedFSDataset(); final SimulatedFSDataset fsdataset = getSimulatedFSDataset();
ExtendedBlock b = new ExtendedBlock(bpid, 1, 5, 0); ExtendedBlock b = new ExtendedBlock(bpid, 1, 5, 0);
try { try {
assertFalse(fsdataset.metaFileExists(b)); assertFalse(fsdataset.metaFileExists(b));
@ -121,7 +120,7 @@ public void testGetMetaData() throws IOException {
public void testStorageUsage() throws IOException { public void testStorageUsage() throws IOException {
FSDatasetInterface fsdataset = getSimulatedFSDataset(); final SimulatedFSDataset fsdataset = getSimulatedFSDataset();
assertEquals(fsdataset.getDfsUsed(), 0); assertEquals(fsdataset.getDfsUsed(), 0);
assertEquals(fsdataset.getRemaining(), fsdataset.getCapacity()); assertEquals(fsdataset.getRemaining(), fsdataset.getCapacity());
int bytesAdded = addSomeBlocks(fsdataset); int bytesAdded = addSomeBlocks(fsdataset);
@ -131,7 +130,7 @@ public void testStorageUsage() throws IOException {
void checkBlockDataAndSize(FSDatasetInterface fsdataset, ExtendedBlock b, void checkBlockDataAndSize(SimulatedFSDataset fsdataset, ExtendedBlock b,
long expectedLen) throws IOException { long expectedLen) throws IOException {
InputStream input = fsdataset.getBlockInputStream(b); InputStream input = fsdataset.getBlockInputStream(b);
long lengthRead = 0; long lengthRead = 0;
@ -144,7 +143,7 @@ void checkBlockDataAndSize(FSDatasetInterface fsdataset, ExtendedBlock b,
} }
public void testWriteRead() throws IOException { public void testWriteRead() throws IOException {
FSDatasetInterface fsdataset = getSimulatedFSDataset(); final SimulatedFSDataset fsdataset = getSimulatedFSDataset();
addSomeBlocks(fsdataset); addSomeBlocks(fsdataset);
for (int i=1; i <= NUMBLOCKS; ++i) { for (int i=1; i <= NUMBLOCKS; ++i) {
ExtendedBlock b = new ExtendedBlock(bpid, i, 0, 0); ExtendedBlock b = new ExtendedBlock(bpid, i, 0, 0);
@ -244,7 +243,7 @@ public void testInjectionNonEmpty() throws IOException {
} }
public void checkInvalidBlock(ExtendedBlock b) throws IOException { public void checkInvalidBlock(ExtendedBlock b) throws IOException {
FSDatasetInterface fsdataset = getSimulatedFSDataset(); final SimulatedFSDataset fsdataset = getSimulatedFSDataset();
assertFalse(fsdataset.isValidBlock(b)); assertFalse(fsdataset.isValidBlock(b));
try { try {
fsdataset.getLength(b); fsdataset.getLength(b);
@ -269,7 +268,7 @@ public void checkInvalidBlock(ExtendedBlock b) throws IOException {
} }
public void testInValidBlocks() throws IOException { public void testInValidBlocks() throws IOException {
FSDatasetInterface fsdataset = getSimulatedFSDataset(); final SimulatedFSDataset fsdataset = getSimulatedFSDataset();
ExtendedBlock b = new ExtendedBlock(bpid, 1, 5, 0); ExtendedBlock b = new ExtendedBlock(bpid, 1, 5, 0);
checkInvalidBlock(b); checkInvalidBlock(b);
@ -280,7 +279,7 @@ public void testInValidBlocks() throws IOException {
} }
public void testInvalidate() throws IOException { public void testInvalidate() throws IOException {
FSDatasetInterface fsdataset = getSimulatedFSDataset(); final SimulatedFSDataset fsdataset = getSimulatedFSDataset();
int bytesAdded = addSomeBlocks(fsdataset); int bytesAdded = addSomeBlocks(fsdataset);
Block[] deleteBlocks = new Block[2]; Block[] deleteBlocks = new Block[2];
deleteBlocks[0] = new Block(1, 0, 0); deleteBlocks[0] = new Block(1, 0, 0);