svn merge -c 1295929 from trunk for HDFS-3021.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1295932 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
d0de078933
commit
d93c0b72a2
|
@ -45,6 +45,8 @@ Release 0.23.3 - UNRELEASED
|
||||||
HDFS-2899. Service protocol changes in DatanodeProtocol to add multiple
|
HDFS-2899. Service protocol changes in DatanodeProtocol to add multiple
|
||||||
storages. (suresh)
|
storages. (suresh)
|
||||||
|
|
||||||
|
HDFS-3021. Use generic type to declare FSDatasetInterface. (szetszwo)
|
||||||
|
|
||||||
IMPROVEMENTS
|
IMPROVEMENTS
|
||||||
|
|
||||||
HDFS-2018. Move all journal stream management code into one place.
|
HDFS-2018. Move all journal stream management code into one place.
|
||||||
|
|
|
@ -74,7 +74,7 @@ class BlockPoolSliceScanner {
|
||||||
|
|
||||||
private long scanPeriod = DEFAULT_SCAN_PERIOD_HOURS * 3600 * 1000;
|
private long scanPeriod = DEFAULT_SCAN_PERIOD_HOURS * 3600 * 1000;
|
||||||
private DataNode datanode;
|
private DataNode datanode;
|
||||||
private final FSDatasetInterface dataset;
|
private final FSDatasetInterface<? extends FSVolumeInterface> dataset;
|
||||||
|
|
||||||
// sorted set
|
// sorted set
|
||||||
private TreeSet<BlockScanInfo> blockInfoSet;
|
private TreeSet<BlockScanInfo> blockInfoSet;
|
||||||
|
@ -133,7 +133,8 @@ class BlockPoolSliceScanner {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
BlockPoolSliceScanner(DataNode datanode, FSDatasetInterface dataset,
|
BlockPoolSliceScanner(DataNode datanode,
|
||||||
|
FSDatasetInterface<? extends FSVolumeInterface> dataset,
|
||||||
Configuration conf, String bpid) {
|
Configuration conf, String bpid) {
|
||||||
this.datanode = datanode;
|
this.datanode = datanode;
|
||||||
this.dataset = dataset;
|
this.dataset = dataset;
|
||||||
|
@ -216,7 +217,7 @@ class BlockPoolSliceScanner {
|
||||||
* otherwise, pick the first directory.
|
* otherwise, pick the first directory.
|
||||||
*/
|
*/
|
||||||
File dir = null;
|
File dir = null;
|
||||||
List<FSVolumeInterface> volumes = dataset.getVolumes();
|
final List<? extends FSVolumeInterface> volumes = dataset.getVolumes();
|
||||||
for (FSVolumeInterface vol : volumes) {
|
for (FSVolumeInterface vol : volumes) {
|
||||||
File bpDir = vol.getDirectory(blockPoolId);
|
File bpDir = vol.getDirectory(blockPoolId);
|
||||||
if (LogFileHandler.isFilePresent(bpDir, verificationLogFile)) {
|
if (LogFileHandler.isFilePresent(bpDir, verificationLogFile)) {
|
||||||
|
|
|
@ -21,7 +21,6 @@ import java.io.IOException;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.classification.InterfaceStability;
|
|
||||||
import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface.FSVolumeInterface;
|
import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface.FSVolumeInterface;
|
||||||
|
|
||||||
/**************************************************
|
/**************************************************
|
||||||
|
@ -34,7 +33,7 @@ import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface.FSVolumeInterfa
|
||||||
*
|
*
|
||||||
***************************************************/
|
***************************************************/
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
public interface BlockVolumeChoosingPolicy {
|
public interface BlockVolumeChoosingPolicy<V extends FSVolumeInterface> {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns a specific FSVolume after applying a suitable choice algorithm
|
* Returns a specific FSVolume after applying a suitable choice algorithm
|
||||||
|
@ -48,7 +47,5 @@ public interface BlockVolumeChoosingPolicy {
|
||||||
* @return the chosen volume to store the block.
|
* @return the chosen volume to store the block.
|
||||||
* @throws IOException when disks are unavailable or are full.
|
* @throws IOException when disks are unavailable or are full.
|
||||||
*/
|
*/
|
||||||
public FSVolumeInterface chooseVolume(List<FSVolumeInterface> volumes, long blockSize)
|
public V chooseVolume(List<V> volumes, long blockSize) throws IOException;
|
||||||
throws IOException;
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -33,6 +33,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hdfs.protocol.Block;
|
import org.apache.hadoop.hdfs.protocol.Block;
|
||||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||||
|
import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface.FSVolumeInterface;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* DataBlockScanner manages block scanning for all the block pools. For each
|
* DataBlockScanner manages block scanning for all the block pools. For each
|
||||||
|
@ -44,7 +45,7 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||||
public class DataBlockScanner implements Runnable {
|
public class DataBlockScanner implements Runnable {
|
||||||
public static final Log LOG = LogFactory.getLog(DataBlockScanner.class);
|
public static final Log LOG = LogFactory.getLog(DataBlockScanner.class);
|
||||||
private final DataNode datanode;
|
private final DataNode datanode;
|
||||||
private final FSDatasetInterface dataset;
|
private final FSDatasetInterface<? extends FSVolumeInterface> dataset;
|
||||||
private final Configuration conf;
|
private final Configuration conf;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -55,7 +56,9 @@ public class DataBlockScanner implements Runnable {
|
||||||
new TreeMap<String, BlockPoolSliceScanner>();
|
new TreeMap<String, BlockPoolSliceScanner>();
|
||||||
Thread blockScannerThread = null;
|
Thread blockScannerThread = null;
|
||||||
|
|
||||||
DataBlockScanner(DataNode datanode, FSDatasetInterface dataset, Configuration conf) {
|
DataBlockScanner(DataNode datanode,
|
||||||
|
FSDatasetInterface<? extends FSVolumeInterface> dataset,
|
||||||
|
Configuration conf) {
|
||||||
this.datanode = datanode;
|
this.datanode = datanode;
|
||||||
this.dataset = dataset;
|
this.dataset = dataset;
|
||||||
this.conf = conf;
|
this.conf = conf;
|
||||||
|
|
|
@ -122,6 +122,7 @@ import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
|
||||||
import org.apache.hadoop.hdfs.server.common.JspHelper;
|
import org.apache.hadoop.hdfs.server.common.JspHelper;
|
||||||
import org.apache.hadoop.hdfs.server.common.StorageInfo;
|
import org.apache.hadoop.hdfs.server.common.StorageInfo;
|
||||||
import org.apache.hadoop.hdfs.server.common.Util;
|
import org.apache.hadoop.hdfs.server.common.Util;
|
||||||
|
import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface.FSVolumeInterface;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.SecureDataNodeStarter.SecureResources;
|
import org.apache.hadoop.hdfs.server.datanode.SecureDataNodeStarter.SecureResources;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
|
import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.web.resources.DatanodeWebHdfsMethods;
|
import org.apache.hadoop.hdfs.server.datanode.web.resources.DatanodeWebHdfsMethods;
|
||||||
|
@ -139,7 +140,6 @@ import org.apache.hadoop.hdfs.web.resources.Param;
|
||||||
import org.apache.hadoop.http.HttpServer;
|
import org.apache.hadoop.http.HttpServer;
|
||||||
import org.apache.hadoop.io.IOUtils;
|
import org.apache.hadoop.io.IOUtils;
|
||||||
import org.apache.hadoop.ipc.ProtobufRpcEngine;
|
import org.apache.hadoop.ipc.ProtobufRpcEngine;
|
||||||
import org.apache.hadoop.ipc.ProtocolSignature;
|
|
||||||
import org.apache.hadoop.ipc.RPC;
|
import org.apache.hadoop.ipc.RPC;
|
||||||
import org.apache.hadoop.ipc.RemoteException;
|
import org.apache.hadoop.ipc.RemoteException;
|
||||||
import org.apache.hadoop.ipc.Server;
|
import org.apache.hadoop.ipc.Server;
|
||||||
|
@ -370,7 +370,7 @@ public class DataNode extends Configured
|
||||||
|
|
||||||
volatile boolean shouldRun = true;
|
volatile boolean shouldRun = true;
|
||||||
private BlockPoolManager blockPoolManager;
|
private BlockPoolManager blockPoolManager;
|
||||||
public volatile FSDatasetInterface data = null;
|
public volatile FSDatasetInterface<? extends FSVolumeInterface> data = null;
|
||||||
private String clusterId = null;
|
private String clusterId = null;
|
||||||
|
|
||||||
public final static String EMPTY_DEL_HINT = "";
|
public final static String EMPTY_DEL_HINT = "";
|
||||||
|
@ -889,7 +889,7 @@ public class DataNode extends Configured
|
||||||
* handshake with the the first namenode is completed.
|
* handshake with the the first namenode is completed.
|
||||||
*/
|
*/
|
||||||
private void initStorage(final NamespaceInfo nsInfo) throws IOException {
|
private void initStorage(final NamespaceInfo nsInfo) throws IOException {
|
||||||
final FSDatasetInterface.Factory factory
|
final FSDatasetInterface.Factory<? extends FSDatasetInterface<?>> factory
|
||||||
= FSDatasetInterface.Factory.getFactory(conf);
|
= FSDatasetInterface.Factory.getFactory(conf);
|
||||||
|
|
||||||
if (!factory.isSimulated()) {
|
if (!factory.isSimulated()) {
|
||||||
|
@ -1776,11 +1776,11 @@ public class DataNode extends Configured
|
||||||
/**
|
/**
|
||||||
* This method is used for testing.
|
* This method is used for testing.
|
||||||
* Examples are adding and deleting blocks directly.
|
* Examples are adding and deleting blocks directly.
|
||||||
* The most common usage will be when the data node's storage is similated.
|
* The most common usage will be when the data node's storage is simulated.
|
||||||
*
|
*
|
||||||
* @return the fsdataset that stores the blocks
|
* @return the fsdataset that stores the blocks
|
||||||
*/
|
*/
|
||||||
public FSDatasetInterface getFSDataset() {
|
FSDatasetInterface<?> getFSDataset() {
|
||||||
return data;
|
return data;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -55,7 +55,7 @@ public class DirectoryScanner implements Runnable {
|
||||||
private static final Log LOG = LogFactory.getLog(DirectoryScanner.class);
|
private static final Log LOG = LogFactory.getLog(DirectoryScanner.class);
|
||||||
|
|
||||||
private final DataNode datanode;
|
private final DataNode datanode;
|
||||||
private final FSDatasetInterface dataset;
|
private final FSDatasetInterface<?> dataset;
|
||||||
private final ExecutorService reportCompileThreadPool;
|
private final ExecutorService reportCompileThreadPool;
|
||||||
private final ScheduledExecutorService masterThread;
|
private final ScheduledExecutorService masterThread;
|
||||||
private final long scanPeriodMsecs;
|
private final long scanPeriodMsecs;
|
||||||
|
@ -219,7 +219,7 @@ public class DirectoryScanner implements Runnable {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
DirectoryScanner(DataNode dn, FSDatasetInterface dataset, Configuration conf) {
|
DirectoryScanner(DataNode dn, FSDatasetInterface<?> dataset, Configuration conf) {
|
||||||
this.datanode = dn;
|
this.datanode = dn;
|
||||||
this.dataset = dataset;
|
this.dataset = dataset;
|
||||||
int interval = conf.getInt(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_KEY,
|
int interval = conf.getInt(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_KEY,
|
||||||
|
@ -411,7 +411,7 @@ public class DirectoryScanner implements Runnable {
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Is the given volume still valid in the dataset? */
|
/** Is the given volume still valid in the dataset? */
|
||||||
private static boolean isValid(final FSDatasetInterface dataset,
|
private static boolean isValid(final FSDatasetInterface<?> dataset,
|
||||||
final FSVolumeInterface volume) {
|
final FSVolumeInterface volume) {
|
||||||
for (FSVolumeInterface vol : dataset.getVolumes()) {
|
for (FSVolumeInterface vol : dataset.getVolumes()) {
|
||||||
if (vol == volume) {
|
if (vol == volume) {
|
||||||
|
@ -424,7 +424,7 @@ public class DirectoryScanner implements Runnable {
|
||||||
/** Get lists of blocks on the disk sorted by blockId, per blockpool */
|
/** Get lists of blocks on the disk sorted by blockId, per blockpool */
|
||||||
private Map<String, ScanInfo[]> getDiskReport() {
|
private Map<String, ScanInfo[]> getDiskReport() {
|
||||||
// First get list of data directories
|
// First get list of data directories
|
||||||
final List<FSVolumeInterface> volumes = dataset.getVolumes();
|
final List<? extends FSVolumeInterface> volumes = dataset.getVolumes();
|
||||||
ArrayList<ScanInfoPerBlockPool> dirReports =
|
ArrayList<ScanInfoPerBlockPool> dirReports =
|
||||||
new ArrayList<ScanInfoPerBlockPool>(volumes.size());
|
new ArrayList<ScanInfoPerBlockPool>(volumes.size());
|
||||||
|
|
||||||
|
|
|
@ -57,6 +57,7 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||||
import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
|
import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
|
||||||
import org.apache.hadoop.hdfs.server.common.GenerationStamp;
|
import org.apache.hadoop.hdfs.server.common.GenerationStamp;
|
||||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
|
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
|
||||||
|
import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface.FSVolumeInterface;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
|
import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
|
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
|
import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
|
||||||
|
@ -74,13 +75,13 @@ import org.apache.hadoop.util.ReflectionUtils;
|
||||||
*
|
*
|
||||||
***************************************************/
|
***************************************************/
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
class FSDataset implements FSDatasetInterface {
|
class FSDataset implements FSDatasetInterface<FSDataset.FSVolume> {
|
||||||
/**
|
/**
|
||||||
* A factory for creating FSDataset objects.
|
* A factory for creating FSDataset objects.
|
||||||
*/
|
*/
|
||||||
static class Factory extends FSDatasetInterface.Factory {
|
static class Factory extends FSDatasetInterface.Factory<FSDataset> {
|
||||||
@Override
|
@Override
|
||||||
public FSDatasetInterface createFSDatasetInterface(DataNode datanode,
|
public FSDataset createFSDatasetInterface(DataNode datanode,
|
||||||
DataStorage storage, Configuration conf) throws IOException {
|
DataStorage storage, Configuration conf) throws IOException {
|
||||||
return new FSDataset(datanode, storage, conf);
|
return new FSDataset(datanode, storage, conf);
|
||||||
}
|
}
|
||||||
|
@ -786,13 +787,13 @@ class FSDataset implements FSDatasetInterface {
|
||||||
* Read access to this unmodifiable list is not synchronized.
|
* Read access to this unmodifiable list is not synchronized.
|
||||||
* This list is replaced on modification holding "this" lock.
|
* This list is replaced on modification holding "this" lock.
|
||||||
*/
|
*/
|
||||||
private volatile List<FSVolumeInterface> volumes = null;
|
private volatile List<FSVolume> volumes = null;
|
||||||
|
|
||||||
BlockVolumeChoosingPolicy blockChooser;
|
BlockVolumeChoosingPolicy<FSVolume> blockChooser;
|
||||||
int numFailedVolumes;
|
int numFailedVolumes;
|
||||||
|
|
||||||
FSVolumeSet(List<FSVolumeInterface> volumes, int failedVols,
|
FSVolumeSet(List<FSVolume> volumes, int failedVols,
|
||||||
BlockVolumeChoosingPolicy blockChooser) {
|
BlockVolumeChoosingPolicy<FSVolume> blockChooser) {
|
||||||
this.volumes = Collections.unmodifiableList(volumes);
|
this.volumes = Collections.unmodifiableList(volumes);
|
||||||
this.blockChooser = blockChooser;
|
this.blockChooser = blockChooser;
|
||||||
this.numFailedVolumes = failedVols;
|
this.numFailedVolumes = failedVols;
|
||||||
|
@ -810,29 +811,29 @@ class FSDataset implements FSDatasetInterface {
|
||||||
* @return next volume to store the block in.
|
* @return next volume to store the block in.
|
||||||
*/
|
*/
|
||||||
synchronized FSVolume getNextVolume(long blockSize) throws IOException {
|
synchronized FSVolume getNextVolume(long blockSize) throws IOException {
|
||||||
return (FSVolume)blockChooser.chooseVolume(volumes, blockSize);
|
return blockChooser.chooseVolume(volumes, blockSize);
|
||||||
}
|
}
|
||||||
|
|
||||||
private long getDfsUsed() throws IOException {
|
private long getDfsUsed() throws IOException {
|
||||||
long dfsUsed = 0L;
|
long dfsUsed = 0L;
|
||||||
for (FSVolumeInterface v : volumes) {
|
for (FSVolume v : volumes) {
|
||||||
dfsUsed += ((FSVolume)v).getDfsUsed();
|
dfsUsed += v.getDfsUsed();
|
||||||
}
|
}
|
||||||
return dfsUsed;
|
return dfsUsed;
|
||||||
}
|
}
|
||||||
|
|
||||||
private long getBlockPoolUsed(String bpid) throws IOException {
|
private long getBlockPoolUsed(String bpid) throws IOException {
|
||||||
long dfsUsed = 0L;
|
long dfsUsed = 0L;
|
||||||
for (FSVolumeInterface v : volumes) {
|
for (FSVolume v : volumes) {
|
||||||
dfsUsed += ((FSVolume)v).getBlockPoolUsed(bpid);
|
dfsUsed += v.getBlockPoolUsed(bpid);
|
||||||
}
|
}
|
||||||
return dfsUsed;
|
return dfsUsed;
|
||||||
}
|
}
|
||||||
|
|
||||||
private long getCapacity() throws IOException {
|
private long getCapacity() throws IOException {
|
||||||
long capacity = 0L;
|
long capacity = 0L;
|
||||||
for (FSVolumeInterface v : volumes) {
|
for (FSVolume v : volumes) {
|
||||||
capacity += ((FSVolume)v).getCapacity();
|
capacity += v.getCapacity();
|
||||||
}
|
}
|
||||||
return capacity;
|
return capacity;
|
||||||
}
|
}
|
||||||
|
@ -845,17 +846,16 @@ class FSDataset implements FSDatasetInterface {
|
||||||
return remaining;
|
return remaining;
|
||||||
}
|
}
|
||||||
|
|
||||||
private void getVolumeMap(ReplicasMap volumeMap)
|
private void getVolumeMap(ReplicasMap volumeMap) throws IOException {
|
||||||
throws IOException {
|
for (FSVolume v : volumes) {
|
||||||
for (FSVolumeInterface v : volumes) {
|
v.getVolumeMap(volumeMap);
|
||||||
((FSVolume)v).getVolumeMap(volumeMap);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void getVolumeMap(String bpid, ReplicasMap volumeMap)
|
private void getVolumeMap(String bpid, ReplicasMap volumeMap)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
for (FSVolumeInterface v : volumes) {
|
for (FSVolume v : volumes) {
|
||||||
((FSVolume)v).getVolumeMap(bpid, volumeMap);
|
v.getVolumeMap(bpid, volumeMap);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -871,10 +871,10 @@ class FSDataset implements FSDatasetInterface {
|
||||||
ArrayList<FSVolume> removedVols = null;
|
ArrayList<FSVolume> removedVols = null;
|
||||||
|
|
||||||
// Make a copy of volumes for performing modification
|
// Make a copy of volumes for performing modification
|
||||||
final List<FSVolumeInterface> volumeList = new ArrayList<FSVolumeInterface>(volumes);
|
final List<FSVolume> volumeList = new ArrayList<FSVolume>(volumes);
|
||||||
|
|
||||||
for (int idx = 0; idx < volumeList.size(); idx++) {
|
for (int idx = 0; idx < volumeList.size(); idx++) {
|
||||||
FSVolume fsv = (FSVolume)volumeList.get(idx);
|
FSVolume fsv = volumeList.get(idx);
|
||||||
try {
|
try {
|
||||||
fsv.checkDirs();
|
fsv.checkDirs();
|
||||||
} catch (DiskErrorException e) {
|
} catch (DiskErrorException e) {
|
||||||
|
@ -891,8 +891,8 @@ class FSDataset implements FSDatasetInterface {
|
||||||
|
|
||||||
// Remove null volumes from the volumes array
|
// Remove null volumes from the volumes array
|
||||||
if (removedVols != null && removedVols.size() > 0) {
|
if (removedVols != null && removedVols.size() > 0) {
|
||||||
List<FSVolumeInterface> newVols = new ArrayList<FSVolumeInterface>();
|
final List<FSVolume> newVols = new ArrayList<FSVolume>();
|
||||||
for (FSVolumeInterface vol : volumeList) {
|
for (FSVolume vol : volumeList) {
|
||||||
if (vol != null) {
|
if (vol != null) {
|
||||||
newVols.add(vol);
|
newVols.add(vol);
|
||||||
}
|
}
|
||||||
|
@ -914,21 +914,21 @@ class FSDataset implements FSDatasetInterface {
|
||||||
|
|
||||||
private void addBlockPool(String bpid, Configuration conf)
|
private void addBlockPool(String bpid, Configuration conf)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
for (FSVolumeInterface v : volumes) {
|
for (FSVolume v : volumes) {
|
||||||
((FSVolume)v).addBlockPool(bpid, conf);
|
v.addBlockPool(bpid, conf);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void removeBlockPool(String bpid) {
|
private void removeBlockPool(String bpid) {
|
||||||
for (FSVolumeInterface v : volumes) {
|
for (FSVolume v : volumes) {
|
||||||
((FSVolume)v).shutdownBlockPool(bpid);
|
v.shutdownBlockPool(bpid);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void shutdown() {
|
private void shutdown() {
|
||||||
for (FSVolumeInterface volume : volumes) {
|
for (FSVolume volume : volumes) {
|
||||||
if(volume != null) {
|
if(volume != null) {
|
||||||
((FSVolume)volume).shutdown();
|
volume.shutdown();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -991,7 +991,7 @@ class FSDataset implements FSDatasetInterface {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override // FSDatasetInterface
|
@Override // FSDatasetInterface
|
||||||
public List<FSVolumeInterface> getVolumes() {
|
public List<FSVolume> getVolumes() {
|
||||||
return volumes.volumes;
|
return volumes.volumes;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1099,7 +1099,7 @@ class FSDataset implements FSDatasetInterface {
|
||||||
+ ", volume failures tolerated: " + volFailuresTolerated);
|
+ ", volume failures tolerated: " + volFailuresTolerated);
|
||||||
}
|
}
|
||||||
|
|
||||||
final List<FSVolumeInterface> volArray = new ArrayList<FSVolumeInterface>(
|
final List<FSVolume> volArray = new ArrayList<FSVolume>(
|
||||||
storage.getNumStorageDirs());
|
storage.getNumStorageDirs());
|
||||||
for (int idx = 0; idx < storage.getNumStorageDirs(); idx++) {
|
for (int idx = 0; idx < storage.getNumStorageDirs(); idx++) {
|
||||||
final File dir = storage.getStorageDir(idx).getCurrentDir();
|
final File dir = storage.getStorageDir(idx).getCurrentDir();
|
||||||
|
@ -1108,12 +1108,12 @@ class FSDataset implements FSDatasetInterface {
|
||||||
}
|
}
|
||||||
volumeMap = new ReplicasMap(this);
|
volumeMap = new ReplicasMap(this);
|
||||||
|
|
||||||
BlockVolumeChoosingPolicy blockChooserImpl =
|
@SuppressWarnings("unchecked")
|
||||||
(BlockVolumeChoosingPolicy) ReflectionUtils.newInstance(
|
final BlockVolumeChoosingPolicy<FSVolume> blockChooserImpl =
|
||||||
conf.getClass(DFSConfigKeys.DFS_DATANODE_BLOCKVOLUMECHOICEPOLICY,
|
ReflectionUtils.newInstance(conf.getClass(
|
||||||
|
DFSConfigKeys.DFS_DATANODE_BLOCKVOLUMECHOICEPOLICY,
|
||||||
RoundRobinVolumesPolicy.class,
|
RoundRobinVolumesPolicy.class,
|
||||||
BlockVolumeChoosingPolicy.class),
|
BlockVolumeChoosingPolicy.class), conf);
|
||||||
conf);
|
|
||||||
volumes = new FSVolumeSet(volArray, volsFailed, blockChooserImpl);
|
volumes = new FSVolumeSet(volArray, volsFailed, blockChooserImpl);
|
||||||
volumes.getVolumeMap(volumeMap);
|
volumes.getVolumeMap(volumeMap);
|
||||||
|
|
||||||
|
@ -2001,7 +2001,7 @@ class FSDataset implements FSDatasetInterface {
|
||||||
boolean error = false;
|
boolean error = false;
|
||||||
for (int i = 0; i < invalidBlks.length; i++) {
|
for (int i = 0; i < invalidBlks.length; i++) {
|
||||||
File f = null;
|
File f = null;
|
||||||
FSVolume v;
|
final FSVolume v;
|
||||||
synchronized (this) {
|
synchronized (this) {
|
||||||
f = getFile(bpid, invalidBlks[i].getBlockId());
|
f = getFile(bpid, invalidBlks[i].getBlockId());
|
||||||
ReplicaInfo dinfo = volumeMap.get(bpid, invalidBlks[i]);
|
ReplicaInfo dinfo = volumeMap.get(bpid, invalidBlks[i]);
|
||||||
|
@ -2553,8 +2553,7 @@ class FSDataset implements FSDatasetInterface {
|
||||||
|
|
||||||
private Collection<VolumeInfo> getVolumeInfo() {
|
private Collection<VolumeInfo> getVolumeInfo() {
|
||||||
Collection<VolumeInfo> info = new ArrayList<VolumeInfo>();
|
Collection<VolumeInfo> info = new ArrayList<VolumeInfo>();
|
||||||
for (FSVolumeInterface v : volumes.volumes) {
|
for (FSVolume volume : volumes.volumes) {
|
||||||
final FSVolume volume = (FSVolume)v;
|
|
||||||
long used = 0;
|
long used = 0;
|
||||||
long free = 0;
|
long free = 0;
|
||||||
try {
|
try {
|
||||||
|
@ -2590,8 +2589,8 @@ class FSDataset implements FSDatasetInterface {
|
||||||
public synchronized void deleteBlockPool(String bpid, boolean force)
|
public synchronized void deleteBlockPool(String bpid, boolean force)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
if (!force) {
|
if (!force) {
|
||||||
for (FSVolumeInterface volume : volumes.volumes) {
|
for (FSVolume volume : volumes.volumes) {
|
||||||
if (!((FSVolume)volume).isBPDirEmpty(bpid)) {
|
if (!volume.isBPDirEmpty(bpid)) {
|
||||||
DataNode.LOG.warn(bpid
|
DataNode.LOG.warn(bpid
|
||||||
+ " has some block files, cannot delete unless forced");
|
+ " has some block files, cannot delete unless forced");
|
||||||
throw new IOException("Cannot delete block pool, "
|
throw new IOException("Cannot delete block pool, "
|
||||||
|
@ -2599,8 +2598,8 @@ class FSDataset implements FSDatasetInterface {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
for (FSVolumeInterface volume : volumes.volumes) {
|
for (FSVolume volume : volumes.volumes) {
|
||||||
((FSVolume)volume).deleteBPDirectories(bpid, force);
|
volume.deleteBPDirectories(bpid, force);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -50,13 +50,15 @@ import org.apache.hadoop.util.DiskChecker.DiskErrorException;
|
||||||
*
|
*
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
public interface FSDatasetInterface extends FSDatasetMBean {
|
public interface FSDatasetInterface<V extends FSDatasetInterface.FSVolumeInterface>
|
||||||
|
extends FSDatasetMBean {
|
||||||
/**
|
/**
|
||||||
* A factory for creating FSDatasetInterface objects.
|
* A factory for creating FSDatasetInterface objects.
|
||||||
*/
|
*/
|
||||||
public abstract class Factory {
|
public abstract class Factory<D extends FSDatasetInterface<?>> {
|
||||||
/** @return the configured factory. */
|
/** @return the configured factory. */
|
||||||
public static Factory getFactory(Configuration conf) {
|
public static Factory<?> getFactory(Configuration conf) {
|
||||||
|
@SuppressWarnings("rawtypes")
|
||||||
final Class<? extends Factory> clazz = conf.getClass(
|
final Class<? extends Factory> clazz = conf.getClass(
|
||||||
DFSConfigKeys.DFS_DATANODE_FSDATASET_FACTORY_KEY,
|
DFSConfigKeys.DFS_DATANODE_FSDATASET_FACTORY_KEY,
|
||||||
FSDataset.Factory.class,
|
FSDataset.Factory.class,
|
||||||
|
@ -65,7 +67,7 @@ public interface FSDatasetInterface extends FSDatasetMBean {
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Create a FSDatasetInterface object. */
|
/** Create a FSDatasetInterface object. */
|
||||||
public abstract FSDatasetInterface createFSDatasetInterface(
|
public abstract D createFSDatasetInterface(
|
||||||
DataNode datanode, DataStorage storage, Configuration conf
|
DataNode datanode, DataStorage storage, Configuration conf
|
||||||
) throws IOException;
|
) throws IOException;
|
||||||
|
|
||||||
|
@ -94,7 +96,7 @@ public interface FSDatasetInterface extends FSDatasetMBean {
|
||||||
}
|
}
|
||||||
|
|
||||||
/** @return a list of volumes. */
|
/** @return a list of volumes. */
|
||||||
public List<FSVolumeInterface> getVolumes();
|
public List<V> getVolumes();
|
||||||
|
|
||||||
/** @return a volume information map (name => info). */
|
/** @return a volume information map (name => info). */
|
||||||
public Map<String, Object> getVolumeInfoMap();
|
public Map<String, Object> getVolumeInfoMap();
|
||||||
|
@ -234,7 +236,7 @@ public interface FSDatasetInterface extends FSDatasetMBean {
|
||||||
this.checksum = checksum;
|
this.checksum = checksum;
|
||||||
}
|
}
|
||||||
|
|
||||||
void close() throws IOException {
|
void close() {
|
||||||
IOUtils.closeStream(dataOut);
|
IOUtils.closeStream(dataOut);
|
||||||
IOUtils.closeStream(checksumOut);
|
IOUtils.closeStream(checksumOut);
|
||||||
}
|
}
|
||||||
|
|
|
@ -23,13 +23,14 @@ import java.util.List;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface.FSVolumeInterface;
|
import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface.FSVolumeInterface;
|
||||||
import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
|
import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
|
||||||
|
|
||||||
public class RoundRobinVolumesPolicy implements BlockVolumeChoosingPolicy {
|
public class RoundRobinVolumesPolicy<V extends FSVolumeInterface>
|
||||||
|
implements BlockVolumeChoosingPolicy<V> {
|
||||||
|
|
||||||
private int curVolume = 0;
|
private int curVolume = 0;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public synchronized FSVolumeInterface chooseVolume(
|
public synchronized V chooseVolume(final List<V> volumes, final long blockSize
|
||||||
List<FSVolumeInterface> volumes, long blockSize) throws IOException {
|
) throws IOException {
|
||||||
if(volumes.size() < 1) {
|
if(volumes.size() < 1) {
|
||||||
throw new DiskOutOfSpaceException("No more available volumes");
|
throw new DiskOutOfSpaceException("No more available volumes");
|
||||||
}
|
}
|
||||||
|
@ -44,7 +45,7 @@ public class RoundRobinVolumesPolicy implements BlockVolumeChoosingPolicy {
|
||||||
long maxAvailable = 0;
|
long maxAvailable = 0;
|
||||||
|
|
||||||
while (true) {
|
while (true) {
|
||||||
FSVolumeInterface volume = volumes.get(curVolume);
|
final V volume = volumes.get(curVolume);
|
||||||
curVolume = (curVolume + 1) % volumes.size();
|
curVolume = (curVolume + 1) % volumes.size();
|
||||||
long availableVolumeSize = volume.getAvailable();
|
long availableVolumeSize = volume.getAvailable();
|
||||||
if (availableVolumeSize > blockSize) { return volume; }
|
if (availableVolumeSize > blockSize) { return volume; }
|
||||||
|
|
|
@ -1565,8 +1565,8 @@ public class MiniDFSCluster {
|
||||||
if (dataNodeIndex < 0 || dataNodeIndex > dataNodes.size()) {
|
if (dataNodeIndex < 0 || dataNodeIndex > dataNodes.size()) {
|
||||||
throw new IndexOutOfBoundsException();
|
throw new IndexOutOfBoundsException();
|
||||||
}
|
}
|
||||||
return dataNodes.get(dataNodeIndex).datanode.getFSDataset().getBlockReport(
|
final DataNode dn = dataNodes.get(dataNodeIndex).datanode;
|
||||||
bpid);
|
return DataNodeTestUtils.getFSDataset(dn).getBlockReport(bpid);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -1598,7 +1598,8 @@ public class MiniDFSCluster {
|
||||||
if (dataNodeIndex < 0 || dataNodeIndex > dataNodes.size()) {
|
if (dataNodeIndex < 0 || dataNodeIndex > dataNodes.size()) {
|
||||||
throw new IndexOutOfBoundsException();
|
throw new IndexOutOfBoundsException();
|
||||||
}
|
}
|
||||||
FSDatasetInterface dataSet = dataNodes.get(dataNodeIndex).datanode.getFSDataset();
|
final DataNode dn = dataNodes.get(dataNodeIndex).datanode;
|
||||||
|
final FSDatasetInterface<?> dataSet = DataNodeTestUtils.getFSDataset(dn);
|
||||||
if (!(dataSet instanceof SimulatedFSDataset)) {
|
if (!(dataSet instanceof SimulatedFSDataset)) {
|
||||||
throw new IOException("injectBlocks is valid only for SimilatedFSDataset");
|
throw new IOException("injectBlocks is valid only for SimilatedFSDataset");
|
||||||
}
|
}
|
||||||
|
@ -1616,7 +1617,8 @@ public class MiniDFSCluster {
|
||||||
if (dataNodeIndex < 0 || dataNodeIndex > dataNodes.size()) {
|
if (dataNodeIndex < 0 || dataNodeIndex > dataNodes.size()) {
|
||||||
throw new IndexOutOfBoundsException();
|
throw new IndexOutOfBoundsException();
|
||||||
}
|
}
|
||||||
FSDatasetInterface dataSet = dataNodes.get(dataNodeIndex).datanode.getFSDataset();
|
final DataNode dn = dataNodes.get(dataNodeIndex).datanode;
|
||||||
|
final FSDatasetInterface<?> dataSet = DataNodeTestUtils.getFSDataset(dn);
|
||||||
if (!(dataSet instanceof SimulatedFSDataset)) {
|
if (!(dataSet instanceof SimulatedFSDataset)) {
|
||||||
throw new IOException("injectBlocks is valid only for SimilatedFSDataset");
|
throw new IOException("injectBlocks is valid only for SimilatedFSDataset");
|
||||||
}
|
}
|
||||||
|
|
|
@ -25,6 +25,7 @@ import org.apache.hadoop.fs.FileStatus;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||||
|
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
|
||||||
|
|
||||||
public class TestDFSRemove extends junit.framework.TestCase {
|
public class TestDFSRemove extends junit.framework.TestCase {
|
||||||
final Path dir = new Path("/test/remove/");
|
final Path dir = new Path("/test/remove/");
|
||||||
|
@ -45,7 +46,7 @@ public class TestDFSRemove extends junit.framework.TestCase {
|
||||||
static long getTotalDfsUsed(MiniDFSCluster cluster) throws IOException {
|
static long getTotalDfsUsed(MiniDFSCluster cluster) throws IOException {
|
||||||
long total = 0;
|
long total = 0;
|
||||||
for(DataNode node : cluster.getDataNodes()) {
|
for(DataNode node : cluster.getDataNodes()) {
|
||||||
total += node.getFSDataset().getDfsUsed();
|
total += DataNodeTestUtils.getFSDataset(node).getDfsUsed();
|
||||||
}
|
}
|
||||||
return total;
|
return total;
|
||||||
}
|
}
|
||||||
|
|
|
@ -59,6 +59,7 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||||
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
|
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
|
||||||
|
import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
|
import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
|
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.LeaseManager;
|
import org.apache.hadoop.hdfs.server.namenode.LeaseManager;
|
||||||
|
@ -209,8 +210,10 @@ public class TestFileCreation extends junit.framework.TestCase {
|
||||||
// can't check capacities for real storage since the OS file system may be changing under us.
|
// can't check capacities for real storage since the OS file system may be changing under us.
|
||||||
if (simulatedStorage) {
|
if (simulatedStorage) {
|
||||||
DataNode dn = cluster.getDataNodes().get(0);
|
DataNode dn = cluster.getDataNodes().get(0);
|
||||||
assertEquals(fileSize, dn.getFSDataset().getDfsUsed());
|
FSDatasetInterface<?> dataset = DataNodeTestUtils.getFSDataset(dn);
|
||||||
assertEquals(SimulatedFSDataset.DEFAULT_CAPACITY-fileSize, dn.getFSDataset().getRemaining());
|
assertEquals(fileSize, dataset.getDfsUsed());
|
||||||
|
assertEquals(SimulatedFSDataset.DEFAULT_CAPACITY-fileSize,
|
||||||
|
dataset.getRemaining());
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
cluster.shutdown();
|
cluster.shutdown();
|
||||||
|
|
|
@ -41,6 +41,17 @@ public class DataNodeTestUtils {
|
||||||
return dn.getDNRegistrationForBP(bpid);
|
return dn.getDNRegistrationForBP(bpid);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This method is used for testing.
|
||||||
|
* Examples are adding and deleting blocks directly.
|
||||||
|
* The most common usage will be when the data node's storage is simulated.
|
||||||
|
*
|
||||||
|
* @return the fsdataset that stores the blocks
|
||||||
|
*/
|
||||||
|
public static FSDatasetInterface<?> getFSDataset(DataNode dn) {
|
||||||
|
return dn.getFSDataset();
|
||||||
|
}
|
||||||
|
|
||||||
public static File getFile(DataNode dn, String bpid, long bid) {
|
public static File getFile(DataNode dn, String bpid, long bid) {
|
||||||
return ((FSDataset)dn.getFSDataset()).getFile(bpid, bid);
|
return ((FSDataset)dn.getFSDataset()).getFile(bpid, bid);
|
||||||
}
|
}
|
||||||
|
|
|
@ -61,10 +61,11 @@ import org.apache.hadoop.util.DiskChecker.DiskErrorException;
|
||||||
*
|
*
|
||||||
* Note the synchronization is coarse grained - it is at each method.
|
* Note the synchronization is coarse grained - it is at each method.
|
||||||
*/
|
*/
|
||||||
public class SimulatedFSDataset implements FSDatasetInterface {
|
public class SimulatedFSDataset
|
||||||
static class Factory extends FSDatasetInterface.Factory {
|
implements FSDatasetInterface<FSDatasetInterface.FSVolumeInterface> {
|
||||||
|
static class Factory extends FSDatasetInterface.Factory<SimulatedFSDataset> {
|
||||||
@Override
|
@Override
|
||||||
public FSDatasetInterface createFSDatasetInterface(DataNode datanode,
|
public SimulatedFSDataset createFSDatasetInterface(DataNode datanode,
|
||||||
DataStorage storage, Configuration conf) throws IOException {
|
DataStorage storage, Configuration conf) throws IOException {
|
||||||
return new SimulatedFSDataset(datanode, storage, conf);
|
return new SimulatedFSDataset(datanode, storage, conf);
|
||||||
}
|
}
|
||||||
|
|
|
@ -210,13 +210,14 @@ public class TestBlockReport {
|
||||||
LOG.debug("Number of blocks allocated " + lBlocks.size());
|
LOG.debug("Number of blocks allocated " + lBlocks.size());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
final DataNode dn0 = cluster.getDataNodes().get(DN_N0);
|
||||||
for (ExtendedBlock b : blocks2Remove) {
|
for (ExtendedBlock b : blocks2Remove) {
|
||||||
if(LOG.isDebugEnabled()) {
|
if(LOG.isDebugEnabled()) {
|
||||||
LOG.debug("Removing the block " + b.getBlockName());
|
LOG.debug("Removing the block " + b.getBlockName());
|
||||||
}
|
}
|
||||||
for (File f : findAllFiles(dataDir,
|
for (File f : findAllFiles(dataDir,
|
||||||
new MyFileFilter(b.getBlockName(), true))) {
|
new MyFileFilter(b.getBlockName(), true))) {
|
||||||
cluster.getDataNodes().get(DN_N0).getFSDataset().unfinalizeBlock(b);
|
DataNodeTestUtils.getFSDataset(dn0).unfinalizeBlock(b);
|
||||||
if (!f.delete())
|
if (!f.delete())
|
||||||
LOG.warn("Couldn't delete " + b.getBlockName());
|
LOG.warn("Couldn't delete " + b.getBlockName());
|
||||||
}
|
}
|
||||||
|
@ -225,9 +226,8 @@ public class TestBlockReport {
|
||||||
waitTil(DN_RESCAN_EXTRA_WAIT);
|
waitTil(DN_RESCAN_EXTRA_WAIT);
|
||||||
|
|
||||||
// all blocks belong to the same file, hence same BP
|
// all blocks belong to the same file, hence same BP
|
||||||
DataNode dn = cluster.getDataNodes().get(DN_N0);
|
|
||||||
String poolId = cluster.getNamesystem().getBlockPoolId();
|
String poolId = cluster.getNamesystem().getBlockPoolId();
|
||||||
DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
|
DatanodeRegistration dnR = dn0.getDNRegistrationForBP(poolId);
|
||||||
StorageBlockReport[] report = { new StorageBlockReport(dnR.getStorageID(),
|
StorageBlockReport[] report = { new StorageBlockReport(dnR.getStorageID(),
|
||||||
new BlockListAsLongs(blocks, null).getBlockListAsLongs()) };
|
new BlockListAsLongs(blocks, null).getBlockListAsLongs()) };
|
||||||
cluster.getNameNodeRpc().blockReport(dnR, poolId, report);
|
cluster.getNameNodeRpc().blockReport(dnR, poolId, report);
|
||||||
|
@ -602,15 +602,15 @@ public class TestBlockReport {
|
||||||
cluster.waitActive();
|
cluster.waitActive();
|
||||||
|
|
||||||
// Look about specified DN for the replica of the block from 1st DN
|
// Look about specified DN for the replica of the block from 1st DN
|
||||||
|
final DataNode dn1 = cluster.getDataNodes().get(DN_N1);
|
||||||
|
final FSDataset dataset1 = (FSDataset)DataNodeTestUtils.getFSDataset(dn1);
|
||||||
String bpid = cluster.getNamesystem().getBlockPoolId();
|
String bpid = cluster.getNamesystem().getBlockPoolId();
|
||||||
Replica r = ((FSDataset) cluster.getDataNodes().get(DN_N1).getFSDataset()).
|
Replica r = dataset1.fetchReplicaInfo(bpid, bl.getBlockId());
|
||||||
fetchReplicaInfo(bpid, bl.getBlockId());
|
|
||||||
long start = System.currentTimeMillis();
|
long start = System.currentTimeMillis();
|
||||||
int count = 0;
|
int count = 0;
|
||||||
while (r == null) {
|
while (r == null) {
|
||||||
waitTil(5);
|
waitTil(5);
|
||||||
r = ((FSDataset) cluster.getDataNodes().get(DN_N1).getFSDataset()).
|
r = dataset1.fetchReplicaInfo(bpid, bl.getBlockId());
|
||||||
fetchReplicaInfo(bpid, bl.getBlockId());
|
|
||||||
long waiting_period = System.currentTimeMillis() - start;
|
long waiting_period = System.currentTimeMillis() - start;
|
||||||
if (count++ % 100 == 0)
|
if (count++ % 100 == 0)
|
||||||
if(LOG.isDebugEnabled()) {
|
if(LOG.isDebugEnabled()) {
|
||||||
|
|
|
@ -145,8 +145,11 @@ public class TestDataNodeVolumeFailure {
|
||||||
DataNode dn = cluster.getDataNodes().get(1); //corresponds to dir data3
|
DataNode dn = cluster.getDataNodes().get(1); //corresponds to dir data3
|
||||||
String bpid = cluster.getNamesystem().getBlockPoolId();
|
String bpid = cluster.getNamesystem().getBlockPoolId();
|
||||||
DatanodeRegistration dnR = dn.getDNRegistrationForBP(bpid);
|
DatanodeRegistration dnR = dn.getDNRegistrationForBP(bpid);
|
||||||
StorageBlockReport[] report = { new StorageBlockReport(dnR.getStorageID(),
|
final StorageBlockReport[] report = {
|
||||||
dn.getFSDataset().getBlockReport(bpid).getBlockListAsLongs()) };
|
new StorageBlockReport(dnR.getStorageID(),
|
||||||
|
DataNodeTestUtils.getFSDataset(dn).getBlockReport(bpid
|
||||||
|
).getBlockListAsLongs())
|
||||||
|
};
|
||||||
cluster.getNameNodeRpc().blockReport(dnR, bpid, report);
|
cluster.getNameNodeRpc().blockReport(dnR, bpid, report);
|
||||||
|
|
||||||
// verify number of blocks and files...
|
// verify number of blocks and files...
|
||||||
|
|
|
@ -24,11 +24,7 @@ import static org.junit.Assume.assumeTrue;
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.concurrent.TimeoutException;
|
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
|
||||||
import org.apache.commons.logging.LogFactory;
|
|
||||||
import org.apache.commons.logging.impl.Log4JLogger;
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.FileUtil;
|
import org.apache.hadoop.fs.FileUtil;
|
||||||
|
@ -38,7 +34,6 @@ import org.apache.hadoop.hdfs.DFSTestUtil;
|
||||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
|
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
|
||||||
import org.apache.log4j.Level;
|
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
@ -47,12 +42,6 @@ import org.junit.Test;
|
||||||
* Test the ability of a DN to tolerate volume failures.
|
* Test the ability of a DN to tolerate volume failures.
|
||||||
*/
|
*/
|
||||||
public class TestDataNodeVolumeFailureToleration {
|
public class TestDataNodeVolumeFailureToleration {
|
||||||
|
|
||||||
private static final Log LOG = LogFactory.getLog(TestDataNodeVolumeFailureToleration.class);
|
|
||||||
{
|
|
||||||
((Log4JLogger)TestDataNodeVolumeFailureToleration.LOG).getLogger().setLevel(Level.ALL);
|
|
||||||
}
|
|
||||||
|
|
||||||
private FileSystem fs;
|
private FileSystem fs;
|
||||||
private MiniDFSCluster cluster;
|
private MiniDFSCluster cluster;
|
||||||
private Configuration conf;
|
private Configuration conf;
|
||||||
|
@ -130,7 +119,7 @@ public class TestDataNodeVolumeFailureToleration {
|
||||||
assertTrue("The DN should have started up fine.",
|
assertTrue("The DN should have started up fine.",
|
||||||
cluster.isDataNodeUp());
|
cluster.isDataNodeUp());
|
||||||
DataNode dn = cluster.getDataNodes().get(0);
|
DataNode dn = cluster.getDataNodes().get(0);
|
||||||
String si = dn.getFSDataset().getStorageInfo();
|
String si = DataNodeTestUtils.getFSDataset(dn).getStorageInfo();
|
||||||
assertTrue("The DN should have started with this directory",
|
assertTrue("The DN should have started with this directory",
|
||||||
si.contains(dataDir1Actual.getPath()));
|
si.contains(dataDir1Actual.getPath()));
|
||||||
assertFalse("The DN shouldn't have a bad directory.",
|
assertFalse("The DN shouldn't have a bad directory.",
|
||||||
|
@ -227,7 +216,7 @@ public class TestDataNodeVolumeFailureToleration {
|
||||||
*/
|
*/
|
||||||
private void testVolumeConfig(int volumesTolerated, int volumesFailed,
|
private void testVolumeConfig(int volumesTolerated, int volumesFailed,
|
||||||
boolean expectedBPServiceState, boolean manageDfsDirs)
|
boolean expectedBPServiceState, boolean manageDfsDirs)
|
||||||
throws IOException, InterruptedException, TimeoutException {
|
throws IOException, InterruptedException {
|
||||||
assumeTrue(!System.getProperty("os.name").startsWith("Windows"));
|
assumeTrue(!System.getProperty("os.name").startsWith("Windows"));
|
||||||
final int dnIndex = 0;
|
final int dnIndex = 0;
|
||||||
// Fail the current directory since invalid storage directory perms
|
// Fail the current directory since invalid storage directory perms
|
||||||
|
|
|
@ -38,7 +38,7 @@ import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
import org.apache.hadoop.hdfs.protocol.Block;
|
import org.apache.hadoop.hdfs.protocol.Block;
|
||||||
import org.apache.hadoop.hdfs.server.common.GenerationStamp;
|
import org.apache.hadoop.hdfs.server.common.GenerationStamp;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface.FSVolumeInterface;
|
import org.apache.hadoop.hdfs.server.datanode.FSDataset.FSVolume;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Tests {@link DirectoryScanner} handling of differences
|
* Tests {@link DirectoryScanner} handling of differences
|
||||||
|
@ -142,7 +142,7 @@ public class TestDirectoryScanner extends TestCase {
|
||||||
|
|
||||||
/** Create a block file in a random volume*/
|
/** Create a block file in a random volume*/
|
||||||
private long createBlockFile() throws IOException {
|
private long createBlockFile() throws IOException {
|
||||||
List<FSVolumeInterface> volumes = fds.getVolumes();
|
List<FSVolume> volumes = fds.getVolumes();
|
||||||
int index = rand.nextInt(volumes.size() - 1);
|
int index = rand.nextInt(volumes.size() - 1);
|
||||||
long id = getFreeBlockId();
|
long id = getFreeBlockId();
|
||||||
File finalizedDir = volumes.get(index).getFinalizedDir(bpid);
|
File finalizedDir = volumes.get(index).getFinalizedDir(bpid);
|
||||||
|
@ -155,7 +155,7 @@ public class TestDirectoryScanner extends TestCase {
|
||||||
|
|
||||||
/** Create a metafile in a random volume*/
|
/** Create a metafile in a random volume*/
|
||||||
private long createMetaFile() throws IOException {
|
private long createMetaFile() throws IOException {
|
||||||
List<FSVolumeInterface> volumes = fds.getVolumes();
|
List<FSVolume> volumes = fds.getVolumes();
|
||||||
int index = rand.nextInt(volumes.size() - 1);
|
int index = rand.nextInt(volumes.size() - 1);
|
||||||
long id = getFreeBlockId();
|
long id = getFreeBlockId();
|
||||||
File finalizedDir = volumes.get(index).getFinalizedDir(bpid);
|
File finalizedDir = volumes.get(index).getFinalizedDir(bpid);
|
||||||
|
@ -168,7 +168,7 @@ public class TestDirectoryScanner extends TestCase {
|
||||||
|
|
||||||
/** Create block file and corresponding metafile in a rondom volume */
|
/** Create block file and corresponding metafile in a rondom volume */
|
||||||
private long createBlockMetaFile() throws IOException {
|
private long createBlockMetaFile() throws IOException {
|
||||||
List<FSVolumeInterface> volumes = fds.getVolumes();
|
List<FSVolume> volumes = fds.getVolumes();
|
||||||
int index = rand.nextInt(volumes.size() - 1);
|
int index = rand.nextInt(volumes.size() - 1);
|
||||||
long id = getFreeBlockId();
|
long id = getFreeBlockId();
|
||||||
File finalizedDir = volumes.get(index).getFinalizedDir(bpid);
|
File finalizedDir = volumes.get(index).getFinalizedDir(bpid);
|
||||||
|
@ -228,7 +228,8 @@ public class TestDirectoryScanner extends TestCase {
|
||||||
try {
|
try {
|
||||||
cluster.waitActive();
|
cluster.waitActive();
|
||||||
bpid = cluster.getNamesystem().getBlockPoolId();
|
bpid = cluster.getNamesystem().getBlockPoolId();
|
||||||
fds = (FSDataset) cluster.getDataNodes().get(0).getFSDataset();
|
fds = (FSDataset)DataNodeTestUtils.getFSDataset(
|
||||||
|
cluster.getDataNodes().get(0));
|
||||||
CONF.setInt(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THREADS_KEY,
|
CONF.setInt(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THREADS_KEY,
|
||||||
parallelism);
|
parallelism);
|
||||||
DataNode dn = cluster.getDataNodes().get(0);
|
DataNode dn = cluster.getDataNodes().get(0);
|
||||||
|
|
|
@ -43,7 +43,9 @@ public class TestRoundRobinVolumesPolicy {
|
||||||
volumes.add(Mockito.mock(FSVolumeInterface.class));
|
volumes.add(Mockito.mock(FSVolumeInterface.class));
|
||||||
Mockito.when(volumes.get(1).getAvailable()).thenReturn(200L);
|
Mockito.when(volumes.get(1).getAvailable()).thenReturn(200L);
|
||||||
|
|
||||||
RoundRobinVolumesPolicy policy = ReflectionUtils.newInstance(
|
@SuppressWarnings("unchecked")
|
||||||
|
final RoundRobinVolumesPolicy<FSVolumeInterface> policy =
|
||||||
|
(RoundRobinVolumesPolicy<FSVolumeInterface>)ReflectionUtils.newInstance(
|
||||||
RoundRobinVolumesPolicy.class, null);
|
RoundRobinVolumesPolicy.class, null);
|
||||||
|
|
||||||
// Test two rounds of round-robin choosing
|
// Test two rounds of round-robin choosing
|
||||||
|
@ -79,7 +81,8 @@ public class TestRoundRobinVolumesPolicy {
|
||||||
volumes.add(Mockito.mock(FSVolumeInterface.class));
|
volumes.add(Mockito.mock(FSVolumeInterface.class));
|
||||||
Mockito.when(volumes.get(1).getAvailable()).thenReturn(600L);
|
Mockito.when(volumes.get(1).getAvailable()).thenReturn(600L);
|
||||||
|
|
||||||
RoundRobinVolumesPolicy policy = new RoundRobinVolumesPolicy();
|
final RoundRobinVolumesPolicy<FSVolumeInterface> policy
|
||||||
|
= new RoundRobinVolumesPolicy<FSVolumeInterface>();
|
||||||
int blockSize = 700;
|
int blockSize = 700;
|
||||||
try {
|
try {
|
||||||
policy.chooseVolume(volumes, blockSize);
|
policy.chooseVolume(volumes, blockSize);
|
||||||
|
|
|
@ -21,6 +21,7 @@ import java.io.DataInputStream;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.InputStream;
|
import java.io.InputStream;
|
||||||
import java.io.OutputStream;
|
import java.io.OutputStream;
|
||||||
|
|
||||||
import junit.framework.TestCase;
|
import junit.framework.TestCase;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
@ -28,8 +29,6 @@ import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||||
import org.apache.hadoop.hdfs.protocol.Block;
|
import org.apache.hadoop.hdfs.protocol.Block;
|
||||||
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
|
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
|
||||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface;
|
|
||||||
import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
|
|
||||||
import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface.BlockWriteStreams;
|
import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface.BlockWriteStreams;
|
||||||
import org.apache.hadoop.util.DataChecksum;
|
import org.apache.hadoop.util.DataChecksum;
|
||||||
|
|
||||||
|
@ -56,7 +55,7 @@ public class TestSimulatedFSDataset extends TestCase {
|
||||||
return blkid*BLOCK_LENGTH_MULTIPLIER;
|
return blkid*BLOCK_LENGTH_MULTIPLIER;
|
||||||
}
|
}
|
||||||
|
|
||||||
int addSomeBlocks(FSDatasetInterface fsdataset, int startingBlockId)
|
int addSomeBlocks(SimulatedFSDataset fsdataset, int startingBlockId)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
int bytesAdded = 0;
|
int bytesAdded = 0;
|
||||||
for (int i = startingBlockId; i < startingBlockId+NUMBLOCKS; ++i) {
|
for (int i = startingBlockId; i < startingBlockId+NUMBLOCKS; ++i) {
|
||||||
|
@ -83,24 +82,24 @@ public class TestSimulatedFSDataset extends TestCase {
|
||||||
}
|
}
|
||||||
return bytesAdded;
|
return bytesAdded;
|
||||||
}
|
}
|
||||||
int addSomeBlocks(FSDatasetInterface fsdataset ) throws IOException {
|
int addSomeBlocks(SimulatedFSDataset fsdataset ) throws IOException {
|
||||||
return addSomeBlocks(fsdataset, 1);
|
return addSomeBlocks(fsdataset, 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testFSDatasetFactory() {
|
public void testFSDatasetFactory() {
|
||||||
final Configuration conf = new Configuration();
|
final Configuration conf = new Configuration();
|
||||||
FSDatasetInterface.Factory f = FSDatasetInterface.Factory.getFactory(conf);
|
FSDatasetInterface.Factory<?> f = FSDatasetInterface.Factory.getFactory(conf);
|
||||||
assertEquals(FSDataset.Factory.class, f.getClass());
|
assertEquals(FSDataset.Factory.class, f.getClass());
|
||||||
assertFalse(f.isSimulated());
|
assertFalse(f.isSimulated());
|
||||||
|
|
||||||
SimulatedFSDataset.setFactory(conf);
|
SimulatedFSDataset.setFactory(conf);
|
||||||
FSDatasetInterface.Factory s = FSDatasetInterface.Factory.getFactory(conf);
|
FSDatasetInterface.Factory<?> s = FSDatasetInterface.Factory.getFactory(conf);
|
||||||
assertEquals(SimulatedFSDataset.Factory.class, s.getClass());
|
assertEquals(SimulatedFSDataset.Factory.class, s.getClass());
|
||||||
assertTrue(s.isSimulated());
|
assertTrue(s.isSimulated());
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testGetMetaData() throws IOException {
|
public void testGetMetaData() throws IOException {
|
||||||
FSDatasetInterface fsdataset = getSimulatedFSDataset();
|
final SimulatedFSDataset fsdataset = getSimulatedFSDataset();
|
||||||
ExtendedBlock b = new ExtendedBlock(bpid, 1, 5, 0);
|
ExtendedBlock b = new ExtendedBlock(bpid, 1, 5, 0);
|
||||||
try {
|
try {
|
||||||
assertFalse(fsdataset.metaFileExists(b));
|
assertFalse(fsdataset.metaFileExists(b));
|
||||||
|
@ -121,7 +120,7 @@ public class TestSimulatedFSDataset extends TestCase {
|
||||||
|
|
||||||
|
|
||||||
public void testStorageUsage() throws IOException {
|
public void testStorageUsage() throws IOException {
|
||||||
FSDatasetInterface fsdataset = getSimulatedFSDataset();
|
final SimulatedFSDataset fsdataset = getSimulatedFSDataset();
|
||||||
assertEquals(fsdataset.getDfsUsed(), 0);
|
assertEquals(fsdataset.getDfsUsed(), 0);
|
||||||
assertEquals(fsdataset.getRemaining(), fsdataset.getCapacity());
|
assertEquals(fsdataset.getRemaining(), fsdataset.getCapacity());
|
||||||
int bytesAdded = addSomeBlocks(fsdataset);
|
int bytesAdded = addSomeBlocks(fsdataset);
|
||||||
|
@ -131,7 +130,7 @@ public class TestSimulatedFSDataset extends TestCase {
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
void checkBlockDataAndSize(FSDatasetInterface fsdataset, ExtendedBlock b,
|
void checkBlockDataAndSize(SimulatedFSDataset fsdataset, ExtendedBlock b,
|
||||||
long expectedLen) throws IOException {
|
long expectedLen) throws IOException {
|
||||||
InputStream input = fsdataset.getBlockInputStream(b);
|
InputStream input = fsdataset.getBlockInputStream(b);
|
||||||
long lengthRead = 0;
|
long lengthRead = 0;
|
||||||
|
@ -144,7 +143,7 @@ public class TestSimulatedFSDataset extends TestCase {
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testWriteRead() throws IOException {
|
public void testWriteRead() throws IOException {
|
||||||
FSDatasetInterface fsdataset = getSimulatedFSDataset();
|
final SimulatedFSDataset fsdataset = getSimulatedFSDataset();
|
||||||
addSomeBlocks(fsdataset);
|
addSomeBlocks(fsdataset);
|
||||||
for (int i=1; i <= NUMBLOCKS; ++i) {
|
for (int i=1; i <= NUMBLOCKS; ++i) {
|
||||||
ExtendedBlock b = new ExtendedBlock(bpid, i, 0, 0);
|
ExtendedBlock b = new ExtendedBlock(bpid, i, 0, 0);
|
||||||
|
@ -244,7 +243,7 @@ public class TestSimulatedFSDataset extends TestCase {
|
||||||
}
|
}
|
||||||
|
|
||||||
public void checkInvalidBlock(ExtendedBlock b) throws IOException {
|
public void checkInvalidBlock(ExtendedBlock b) throws IOException {
|
||||||
FSDatasetInterface fsdataset = getSimulatedFSDataset();
|
final SimulatedFSDataset fsdataset = getSimulatedFSDataset();
|
||||||
assertFalse(fsdataset.isValidBlock(b));
|
assertFalse(fsdataset.isValidBlock(b));
|
||||||
try {
|
try {
|
||||||
fsdataset.getLength(b);
|
fsdataset.getLength(b);
|
||||||
|
@ -269,7 +268,7 @@ public class TestSimulatedFSDataset extends TestCase {
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testInValidBlocks() throws IOException {
|
public void testInValidBlocks() throws IOException {
|
||||||
FSDatasetInterface fsdataset = getSimulatedFSDataset();
|
final SimulatedFSDataset fsdataset = getSimulatedFSDataset();
|
||||||
ExtendedBlock b = new ExtendedBlock(bpid, 1, 5, 0);
|
ExtendedBlock b = new ExtendedBlock(bpid, 1, 5, 0);
|
||||||
checkInvalidBlock(b);
|
checkInvalidBlock(b);
|
||||||
|
|
||||||
|
@ -280,7 +279,7 @@ public class TestSimulatedFSDataset extends TestCase {
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testInvalidate() throws IOException {
|
public void testInvalidate() throws IOException {
|
||||||
FSDatasetInterface fsdataset = getSimulatedFSDataset();
|
final SimulatedFSDataset fsdataset = getSimulatedFSDataset();
|
||||||
int bytesAdded = addSomeBlocks(fsdataset);
|
int bytesAdded = addSomeBlocks(fsdataset);
|
||||||
Block[] deleteBlocks = new Block[2];
|
Block[] deleteBlocks = new Block[2];
|
||||||
deleteBlocks[0] = new Block(1, 0, 0);
|
deleteBlocks[0] = new Block(1, 0, 0);
|
||||||
|
|
Loading…
Reference in New Issue