HDFS-7758. Retire FsDatasetSpi#getVolumes() and use FsDatasetSpi#getVolumeRefs() instead (Lei (Eddy) Xu via Colin P. McCabe)

This commit is contained in:
Colin Patrick Mccabe 2015-05-05 10:55:04 -07:00
parent 3ff91e9e93
commit 24d3a2d4fd
28 changed files with 503 additions and 323 deletions

View File

@ -507,6 +507,9 @@ Release 2.8.0 - UNRELEASED
HDFS-8237. Move all protocol classes used by ClientProtocol to hdfs-client.
(wheat9)
HDFS-7758. Retire FsDatasetSpi#getVolumes() and use
FsDatasetSpi#getVolumeRefs() instead (Lei (Eddy) Xu via Colin P. McCabe)
OPTIMIZATIONS
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than

View File

@ -527,59 +527,48 @@ private void addDifference(LinkedList<ScanInfo> diffRecord,
diffRecord.add(new ScanInfo(blockId, null, null, vol));
}
/** Is the given volume still valid in the dataset? */
private static boolean isValid(final FsDatasetSpi<?> dataset,
final FsVolumeSpi volume) {
for (FsVolumeSpi vol : dataset.getVolumes()) {
if (vol == volume) {
return true;
}
}
return false;
}
/** Get lists of blocks on the disk sorted by blockId, per blockpool */
private Map<String, ScanInfo[]> getDiskReport() {
ScanInfoPerBlockPool list = new ScanInfoPerBlockPool();
ScanInfoPerBlockPool[] dirReports = null;
// First get list of data directories
final List<? extends FsVolumeSpi> volumes = dataset.getVolumes();
try (FsDatasetSpi.FsVolumeReferences volumes =
dataset.getFsVolumeReferences()) {
// Use an array since the threads may return out of order and
// compilersInProgress#keySet may return out of order as well.
ScanInfoPerBlockPool[] dirReports = new ScanInfoPerBlockPool[volumes.size()];
// Use an array since the threads may return out of order and
// compilersInProgress#keySet may return out of order as well.
dirReports = new ScanInfoPerBlockPool[volumes.size()];
Map<Integer, Future<ScanInfoPerBlockPool>> compilersInProgress =
new HashMap<Integer, Future<ScanInfoPerBlockPool>>();
Map<Integer, Future<ScanInfoPerBlockPool>> compilersInProgress =
new HashMap<Integer, Future<ScanInfoPerBlockPool>>();
for (int i = 0; i < volumes.size(); i++) {
if (isValid(dataset, volumes.get(i))) {
for (int i = 0; i < volumes.size(); i++) {
ReportCompiler reportCompiler =
new ReportCompiler(datanode,volumes.get(i));
new ReportCompiler(datanode, volumes.get(i));
Future<ScanInfoPerBlockPool> result =
reportCompileThreadPool.submit(reportCompiler);
reportCompileThreadPool.submit(reportCompiler);
compilersInProgress.put(i, result);
}
}
for (Entry<Integer, Future<ScanInfoPerBlockPool>> report :
compilersInProgress.entrySet()) {
try {
dirReports[report.getKey()] = report.getValue().get();
} catch (Exception ex) {
LOG.error("Error compiling report", ex);
// Propagate ex to DataBlockScanner to deal with
throw new RuntimeException(ex);
for (Entry<Integer, Future<ScanInfoPerBlockPool>> report :
compilersInProgress.entrySet()) {
try {
dirReports[report.getKey()] = report.getValue().get();
} catch (Exception ex) {
LOG.error("Error compiling report", ex);
// Propagate ex to DataBlockScanner to deal with
throw new RuntimeException(ex);
}
}
} catch (IOException e) {
LOG.error("Unexpected IOException by closing FsVolumeReference", e);
}
if (dirReports != null) {
// Compile consolidated report for all the volumes
for (ScanInfoPerBlockPool report : dirReports) {
list.addAll(report);
}
}
// Compile consolidated report for all the volumes
ScanInfoPerBlockPool list = new ScanInfoPerBlockPool();
for (int i = 0; i < volumes.size(); i++) {
if (isValid(dataset, volumes.get(i))) {
// volume is still valid
list.addAll(dirReports[i]);
}
}
return list.toSortedArrays();
}

View File

@ -18,13 +18,16 @@
package org.apache.hadoop.hdfs.server.datanode.fsdataset;
import java.io.Closeable;
import java.io.EOFException;
import java.io.File;
import java.io.FileDescriptor;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.util.Collection;
import java.nio.channels.ClosedChannelException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
@ -50,7 +53,6 @@
import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
import org.apache.hadoop.hdfs.server.datanode.UnexpectedReplicaStateException;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetFactory;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl;
import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
@ -58,7 +60,6 @@
import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
import org.apache.hadoop.util.ReflectionUtils;
/**
@ -92,8 +93,96 @@ public boolean isSimulated() {
}
}
/** @return a list of volumes. */
public List<V> getVolumes();
/**
* It behaviors as an unmodifiable list of FsVolume. Individual FsVolume can
* be obtained by using {@link #get(int)}.
*
* This also holds the reference counts for these volumes. It releases all the
* reference counts in {@link #close()}.
*/
class FsVolumeReferences implements Iterable<FsVolumeSpi>, Closeable {
private final List<FsVolumeReference> references;
public <S extends FsVolumeSpi> FsVolumeReferences(List<S> curVolumes) {
references = new ArrayList<>();
for (FsVolumeSpi v : curVolumes) {
try {
references.add(v.obtainReference());
} catch (ClosedChannelException e) {
// This volume has been closed.
}
}
}
private static class FsVolumeSpiIterator implements
Iterator<FsVolumeSpi> {
private final List<FsVolumeReference> references;
private int idx = 0;
FsVolumeSpiIterator(List<FsVolumeReference> refs) {
references = refs;
}
@Override
public boolean hasNext() {
return idx < references.size();
}
@Override
public FsVolumeSpi next() {
int refIdx = idx++;
return references.get(refIdx).getVolume();
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
}
@Override
public Iterator<FsVolumeSpi> iterator() {
return new FsVolumeSpiIterator(references);
}
/**
* Get the number of volumes.
*/
public int size() {
return references.size();
}
/**
* Get the volume for a given index.
*/
public FsVolumeSpi get(int index) {
return references.get(index).getVolume();
}
@Override
public void close() throws IOException {
IOException ioe = null;
for (FsVolumeReference ref : references) {
try {
ref.close();
} catch (IOException e) {
ioe = e;
}
}
references.clear();
if (ioe != null) {
throw ioe;
}
}
}
/**
* Returns a list of FsVolumes that hold reference counts.
*
* The caller must release the reference of each volume by calling
* {@link FsVolumeReferences#close()}.
*/
public FsVolumeReferences getFsVolumeReferences();
/**
* Add a new volume to the FsDataset.<p/>

View File

@ -21,7 +21,7 @@
import java.io.IOException;
/**
* This is the interface for holding reference count as AutoClosable resource.
* This holds volume reference count as AutoClosable resource.
* It increases the reference count by one in the constructor, and decreases
* the reference count by one in {@link #close()}.
*
@ -37,12 +37,15 @@
*/
public interface FsVolumeReference extends Closeable {
/**
* Descrese the reference count of the volume.
* Decrease the reference count of the volume.
* @throws IOException it never throws IOException.
*/
@Override
public void close() throws IOException;
void close() throws IOException;
/** Returns the underlying volume object */
public FsVolumeSpi getVolume();
/**
* Returns the underlying volume object. Return null if the reference was
* released.
*/
FsVolumeSpi getVolume();
}

View File

@ -138,8 +138,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
}
@Override // FsDatasetSpi
public List<FsVolumeImpl> getVolumes() {
return volumes.getVolumes();
public FsVolumeReferences getFsVolumeReferences() {
return new FsVolumeReferences(volumes.getVolumes());
}
@Override
@ -152,7 +152,7 @@ public StorageReport[] getStorageReports(String bpid)
throws IOException {
List<StorageReport> reports;
synchronized (statsLock) {
List<FsVolumeImpl> curVolumes = getVolumes();
List<FsVolumeImpl> curVolumes = volumes.getVolumes();
reports = new ArrayList<>(curVolumes.size());
for (FsVolumeImpl volume : curVolumes) {
try (FsVolumeReference ref = volume.obtainReference()) {
@ -231,7 +231,7 @@ public LengthInputStream getMetaDataInputStream(ExtendedBlock b)
final DataNode datanode;
final DataStorage dataStorage;
final FsVolumeList volumes;
private final FsVolumeList volumes;
final Map<String, DatanodeStorage> storageMap;
final FsDatasetAsyncDiskService asyncDiskService;
final Daemon lazyWriter;
@ -540,7 +540,7 @@ public long getBlockPoolUsed(String bpid) throws IOException {
*/
@Override // FsDatasetSpi
public boolean hasEnoughResource() {
return getVolumes().size() >= validVolsRequired;
return volumes.getVolumes().size() >= validVolsRequired;
}
/**
@ -1628,7 +1628,7 @@ public Map<DatanodeStorage, BlockListAsLongs> getBlockReports(String bpid) {
Map<String, BlockListAsLongs.Builder> builders =
new HashMap<String, BlockListAsLongs.Builder>();
List<FsVolumeImpl> curVolumes = getVolumes();
List<FsVolumeImpl> curVolumes = volumes.getVolumes();
for (FsVolumeSpi v : curVolumes) {
builders.put(v.getStorageID(), BlockListAsLongs.builder());
}
@ -2535,7 +2535,7 @@ private static class VolumeInfo {
private Collection<VolumeInfo> getVolumeInfo() {
Collection<VolumeInfo> info = new ArrayList<VolumeInfo>();
for (FsVolumeImpl volume : getVolumes()) {
for (FsVolumeImpl volume : volumes.getVolumes()) {
long used = 0;
long free = 0;
try (FsVolumeReference ref = volume.obtainReference()) {
@ -2571,7 +2571,7 @@ public Map<String, Object> getVolumeInfoMap() {
@Override //FsDatasetSpi
public synchronized void deleteBlockPool(String bpid, boolean force)
throws IOException {
List<FsVolumeImpl> curVolumes = getVolumes();
List<FsVolumeImpl> curVolumes = volumes.getVolumes();
if (!force) {
for (FsVolumeImpl volume : curVolumes) {
try (FsVolumeReference ref = volume.obtainReference()) {
@ -2622,7 +2622,7 @@ public BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock block)
@Override // FsDatasetSpi
public HdfsBlocksMetadata getHdfsBlocksMetadata(String poolId,
long[] blockIds) throws IOException {
List<FsVolumeImpl> curVolumes = getVolumes();
List<FsVolumeImpl> curVolumes = volumes.getVolumes();
// List of VolumeIds, one per volume on the datanode
List<byte[]> blocksVolumeIds = new ArrayList<>(curVolumes.size());
// List of indexes into the list of VolumeIds, pointing at the VolumeId of
@ -2730,7 +2730,7 @@ public void submitBackgroundSyncFileRangeRequest(ExtendedBlock block,
}
private boolean ramDiskConfigured() {
for (FsVolumeImpl v: getVolumes()){
for (FsVolumeImpl v: volumes.getVolumes()){
if (v.isTransientStorage()) {
return true;
}
@ -2742,7 +2742,7 @@ private boolean ramDiskConfigured() {
// added or removed.
// This should only be called when the FsDataSetImpl#volumes list is finalized.
private void setupAsyncLazyPersistThreads() {
for (FsVolumeImpl v: getVolumes()){
for (FsVolumeImpl v: volumes.getVolumes()){
setupAsyncLazyPersistThread(v);
}
}
@ -2880,14 +2880,13 @@ private boolean transientFreeSpaceBelowThreshold() throws IOException {
// Don't worry about fragmentation for now. We don't expect more than one
// transient volume per DN.
for (FsVolumeImpl v : getVolumes()) {
try (FsVolumeReference ref = v.obtainReference()) {
try (FsVolumeReferences volumes = getFsVolumeReferences()) {
for (FsVolumeSpi fvs : volumes) {
FsVolumeImpl v = (FsVolumeImpl) fvs;
if (v.isTransientStorage()) {
capacity += v.getCapacity();
free += v.getAvailable();
}
} catch (ClosedChannelException e) {
// ignore.
}
}

View File

@ -198,7 +198,7 @@ private void unreference() {
}
private static class FsVolumeReferenceImpl implements FsVolumeReference {
private final FsVolumeImpl volume;
private FsVolumeImpl volume;
FsVolumeReferenceImpl(FsVolumeImpl volume) throws ClosedChannelException {
this.volume = volume;
@ -211,7 +211,10 @@ private static class FsVolumeReferenceImpl implements FsVolumeReference {
*/
@Override
public void close() throws IOException {
volume.unreference();
if (volume != null) {
volume.unreference();
volume = null;
}
}
@Override

View File

@ -276,10 +276,11 @@ public String toString() {
* @param ref a reference to the new FsVolumeImpl instance.
*/
void addVolume(FsVolumeReference ref) {
FsVolumeImpl volume = (FsVolumeImpl) ref.getVolume();
while (true) {
final FsVolumeImpl[] curVolumes = volumes.get();
final List<FsVolumeImpl> volumeList = Lists.newArrayList(curVolumes);
volumeList.add((FsVolumeImpl)ref.getVolume());
volumeList.add(volume);
if (volumes.compareAndSet(curVolumes,
volumeList.toArray(new FsVolumeImpl[volumeList.size()]))) {
break;
@ -300,9 +301,9 @@ void addVolume(FsVolumeReference ref) {
}
// If the volume is used to replace a failed volume, it needs to reset the
// volume failure info for this volume.
removeVolumeFailureInfo(new File(ref.getVolume().getBasePath()));
removeVolumeFailureInfo(new File(volume.getBasePath()));
FsDatasetImpl.LOG.info("Added new volume: " +
ref.getVolume().getStorageID());
volume.getStorageID());
}
/**

View File

@ -1493,15 +1493,20 @@ public synchronized void startDataNodes(Configuration conf, int numDataNodes,
if (storageCapacities != null) {
for (int i = curDatanodesNum; i < curDatanodesNum+numDataNodes; ++i) {
final int index = i - curDatanodesNum;
List<? extends FsVolumeSpi> volumes = dns[index].getFSDataset().getVolumes();
assert storageCapacities[index].length == storagesPerDatanode;
assert volumes.size() == storagesPerDatanode;
try (FsDatasetSpi.FsVolumeReferences volumes =
dns[index].getFSDataset().getFsVolumeReferences()) {
assert storageCapacities[index].length == storagesPerDatanode;
assert volumes.size() == storagesPerDatanode;
for (int j = 0; j < volumes.size(); ++j) {
FsVolumeImpl volume = (FsVolumeImpl) volumes.get(j);
LOG.info("setCapacityForTesting " + storageCapacities[index][j]
+ " for [" + volume.getStorageType() + "]" + volume.getStorageID());
volume.setCapacityForTesting(storageCapacities[index][j]);
int j = 0;
for (FsVolumeSpi fvs : volumes) {
FsVolumeImpl volume = (FsVolumeImpl) fvs;
LOG.info("setCapacityForTesting " + storageCapacities[index][j]
+ " for [" + volume.getStorageType() + "]" + volume
.getStorageID());
volume.setCapacityForTesting(storageCapacities[index][j]);
j++;
}
}
}
}

View File

@ -20,7 +20,6 @@
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HOST_NAME_KEY;
import java.io.IOException;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -31,7 +30,7 @@
import org.apache.hadoop.hdfs.server.datanode.SecureDataNodeStarter;
import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
import org.apache.hadoop.hdfs.server.datanode.SecureDataNodeStarter.SecureResources;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.net.StaticMapping;
@ -195,12 +194,14 @@ public synchronized void startDataNodes(Configuration conf, int numDataNodes,
if (storageCapacities != null) {
for (int i = curDatanodesNum; i < curDatanodesNum+numDataNodes; ++i) {
List<? extends FsVolumeSpi> volumes = dns[i].getFSDataset().getVolumes();
assert volumes.size() == storagesPerDatanode;
try (FsDatasetSpi.FsVolumeReferences volumes =
dns[i].getFSDataset().getFsVolumeReferences()) {
assert volumes.size() == storagesPerDatanode;
for (int j = 0; j < volumes.size(); ++j) {
FsVolumeImpl volume = (FsVolumeImpl) volumes.get(j);
volume.setCapacityForTesting(storageCapacities[i][j]);
for (int j = 0; j < volumes.size(); ++j) {
FsVolumeImpl volume = (FsVolumeImpl) volumes.get(j);
volume.setCapacityForTesting(storageCapacities[i][j]);
}
}
}
}

View File

@ -33,6 +33,7 @@
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
@ -43,9 +44,7 @@
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertThat;
@ -199,15 +198,16 @@ public void testRemovingStorageDoesNotProduceZombies() throws Exception {
datanodeToRemoveStorageFromIdx++;
}
// Find the volume within the datanode which holds that first storage.
List<? extends FsVolumeSpi> volumes =
datanodeToRemoveStorageFrom.getFSDataset().getVolumes();
assertEquals(NUM_STORAGES_PER_DN, volumes.size());
String volumeDirectoryToRemove = null;
for (FsVolumeSpi volume : volumes) {
if (volume.getStorageID().equals(storageIdToRemove)) {
volumeDirectoryToRemove = volume.getBasePath();
try (FsDatasetSpi.FsVolumeReferences volumes =
datanodeToRemoveStorageFrom.getFSDataset().getFsVolumeReferences()) {
assertEquals(NUM_STORAGES_PER_DN, volumes.size());
for (FsVolumeSpi volume : volumes) {
if (volume.getStorageID().equals(storageIdToRemove)) {
volumeDirectoryToRemove = volume.getBasePath();
}
}
}
};
// Shut down the datanode and remove the volume.
// Replace the volume directory with a regular file, which will
// cause a volume failure. (If we merely removed the directory,

View File

@ -1270,7 +1270,7 @@ public void checkAndUpdate(String bpid, long blockId, File diskFile,
}
@Override
public List<FsVolumeSpi> getVolumes() {
public FsVolumeReferences getFsVolumeReferences() {
throw new UnsupportedOperationException();
}

View File

@ -27,8 +27,8 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.*;
import org.apache.hadoop.hdfs.protocol.*;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
@ -115,11 +115,13 @@ public void testBlockHasMultipleReplicasOnSameDN() throws IOException {
blocks.add(new FinalizedReplica(localBlock, null, null));
}
BlockListAsLongs bll = BlockListAsLongs.encode(blocks);
for (int i = 0; i < cluster.getStoragesPerDatanode(); ++i) {
FsVolumeSpi v = dn.getFSDataset().getVolumes().get(i);
DatanodeStorage dns = new DatanodeStorage(v.getStorageID());
reports[i] = new StorageBlockReport(dns, bll);
try (FsDatasetSpi.FsVolumeReferences volumes =
dn.getFSDataset().getFsVolumeReferences()) {
BlockListAsLongs bll = BlockListAsLongs.encode(blocks);
for (int i = 0; i < cluster.getStoragesPerDatanode(); ++i) {
DatanodeStorage dns = new DatanodeStorage(volumes.get(i).getStorageID());
reports[i] = new StorageBlockReport(dns, bll);
}
}
// Should not assert!

View File

@ -82,7 +82,7 @@ private static class TestContext implements Closeable {
final DataNode datanode;
final BlockScanner blockScanner;
final FsDatasetSpi<? extends FsVolumeSpi> data;
final List<? extends FsVolumeSpi> volumes;
final FsDatasetSpi.FsVolumeReferences volumes;
TestContext(Configuration conf, int numNameServices) throws Exception {
this.numNameServices = numNameServices;
@ -109,11 +109,12 @@ private static class TestContext implements Closeable {
dfs[i].mkdirs(new Path("/test"));
}
data = datanode.getFSDataset();
volumes = data.getVolumes();
volumes = data.getFsVolumeReferences();
}
@Override
public void close() throws IOException {
volumes.close();
if (cluster != null) {
for (int i = 0; i < numNameServices; i++) {
dfs[i].delete(new Path("/test"), true);
@ -713,8 +714,7 @@ public void testMarkSuspectBlock() throws Exception {
ctx.createFiles(0, NUM_EXPECTED_BLOCKS, 1);
final TestScanResultHandler.Info info =
TestScanResultHandler.getInfo(ctx.volumes.get(0));
String storageID = ctx.datanode.getFSDataset().
getVolumes().get(0).getStorageID();
String storageID = ctx.volumes.get(0).getStorageID();
synchronized (info) {
info.sem = new Semaphore(4);
info.shouldRun = true;

View File

@ -517,9 +517,12 @@ public void testAddVolumeFailures() throws IOException {
// Make sure that vol0 and vol2's metadata are not left in memory.
FsDatasetSpi<?> dataset = dn.getFSDataset();
for (FsVolumeSpi volume : dataset.getVolumes()) {
assertThat(volume.getBasePath(), is(not(anyOf(
is(newDirs.get(0)), is(newDirs.get(2))))));
try (FsDatasetSpi.FsVolumeReferences volumes =
dataset.getFsVolumeReferences()) {
for (FsVolumeSpi volume : volumes) {
assertThat(volume.getBasePath(), is(not(anyOf(
is(newDirs.get(0)), is(newDirs.get(2))))));
}
}
DataStorage storage = dn.getStorage();
for (int i = 0; i < storage.getNumStorageDirs(); i++) {
@ -688,10 +691,14 @@ public void testAddBackRemovedVolume()
}
/** Get the FsVolume on the given basePath */
private FsVolumeImpl getVolume(DataNode dn, File basePath) {
for (FsVolumeSpi vol : dn.getFSDataset().getVolumes()) {
if (vol.getBasePath().equals(basePath.getPath())) {
return (FsVolumeImpl)vol;
private FsVolumeImpl getVolume(DataNode dn, File basePath)
throws IOException {
try (FsDatasetSpi.FsVolumeReferences volumes =
dn.getFSDataset().getFsVolumeReferences()) {
for (FsVolumeSpi vol : volumes) {
if (vol.getBasePath().equals(basePath.getPath())) {
return (FsVolumeImpl) vol;
}
}
}
return null;

View File

@ -63,7 +63,6 @@
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetTestUtil;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
@ -249,9 +248,11 @@ public void testFailedVolumeBeingRemovedFromDataNode()
// 2. dn0Vol1 is removed from FsDataset
FsDatasetSpi<? extends FsVolumeSpi> data = dn0.getFSDataset();
for (FsVolumeSpi volume : data.getVolumes()) {
assertNotEquals(new File(volume.getBasePath()).getAbsoluteFile(),
dn0Vol1.getAbsoluteFile());
try (FsDatasetSpi.FsVolumeReferences vols = data.getFsVolumeReferences()) {
for (FsVolumeSpi volume : vols) {
assertNotEquals(new File(volume.getBasePath()).getAbsoluteFile(),
dn0Vol1.getAbsoluteFile());
}
}
// 3. all blocks on dn0Vol1 have been removed.

View File

@ -157,30 +157,37 @@ private long deleteMetaFile() {
private void duplicateBlock(long blockId) throws IOException {
synchronized (fds) {
ReplicaInfo b = FsDatasetTestUtil.fetchReplicaInfo(fds, bpid, blockId);
for (FsVolumeSpi v : fds.getVolumes()) {
if (v.getStorageID().equals(b.getVolume().getStorageID())) {
continue;
}
try (FsDatasetSpi.FsVolumeReferences volumes =
fds.getFsVolumeReferences()) {
for (FsVolumeSpi v : volumes) {
if (v.getStorageID().equals(b.getVolume().getStorageID())) {
continue;
}
// Volume without a copy of the block. Make a copy now.
File sourceBlock = b.getBlockFile();
File sourceMeta = b.getMetaFile();
String sourceRoot = b.getVolume().getBasePath();
String destRoot = v.getBasePath();
// Volume without a copy of the block. Make a copy now.
File sourceBlock = b.getBlockFile();
File sourceMeta = b.getMetaFile();
String sourceRoot = b.getVolume().getBasePath();
String destRoot = v.getBasePath();
String relativeBlockPath = new File(sourceRoot).toURI().relativize(sourceBlock.toURI()).getPath();
String relativeMetaPath = new File(sourceRoot).toURI().relativize(sourceMeta.toURI()).getPath();
String relativeBlockPath =
new File(sourceRoot).toURI().relativize(sourceBlock.toURI())
.getPath();
String relativeMetaPath =
new File(sourceRoot).toURI().relativize(sourceMeta.toURI())
.getPath();
File destBlock = new File(destRoot, relativeBlockPath);
File destMeta = new File(destRoot, relativeMetaPath);
File destBlock = new File(destRoot, relativeBlockPath);
File destMeta = new File(destRoot, relativeMetaPath);
destBlock.getParentFile().mkdirs();
FileUtils.copyFile(sourceBlock, destBlock);
FileUtils.copyFile(sourceMeta, destMeta);
destBlock.getParentFile().mkdirs();
FileUtils.copyFile(sourceBlock, destBlock);
FileUtils.copyFile(sourceMeta, destMeta);
if (destBlock.exists() && destMeta.exists()) {
LOG.info("Copied " + sourceBlock + " ==> " + destBlock);
LOG.info("Copied " + sourceMeta + " ==> " + destMeta);
if (destBlock.exists() && destMeta.exists()) {
LOG.info("Copied " + sourceBlock + " ==> " + destBlock);
LOG.info("Copied " + sourceMeta + " ==> " + destMeta);
}
}
}
}
@ -209,58 +216,67 @@ private String getMetaFile(long id) {
/** Create a block file in a random volume*/
private long createBlockFile() throws IOException {
List<? extends FsVolumeSpi> volumes = fds.getVolumes();
int index = rand.nextInt(volumes.size() - 1);
long id = getFreeBlockId();
File finalizedDir = volumes.get(index).getFinalizedDir(bpid);
File file = new File(finalizedDir, getBlockFile(id));
if (file.createNewFile()) {
LOG.info("Created block file " + file.getName());
try (FsDatasetSpi.FsVolumeReferences volumes = fds.getFsVolumeReferences()) {
int numVolumes = volumes.size();
int index = rand.nextInt(numVolumes - 1);
File finalizedDir = volumes.get(index).getFinalizedDir(bpid);
File file = new File(finalizedDir, getBlockFile(id));
if (file.createNewFile()) {
LOG.info("Created block file " + file.getName());
}
}
return id;
}
/** Create a metafile in a random volume*/
private long createMetaFile() throws IOException {
List<? extends FsVolumeSpi> volumes = fds.getVolumes();
int index = rand.nextInt(volumes.size() - 1);
long id = getFreeBlockId();
File finalizedDir = volumes.get(index).getFinalizedDir(bpid);
File file = new File(finalizedDir, getMetaFile(id));
if (file.createNewFile()) {
LOG.info("Created metafile " + file.getName());
try (FsDatasetSpi.FsVolumeReferences refs = fds.getFsVolumeReferences()) {
int numVolumes = refs.size();
int index = rand.nextInt(numVolumes - 1);
File finalizedDir = refs.get(index).getFinalizedDir(bpid);
File file = new File(finalizedDir, getMetaFile(id));
if (file.createNewFile()) {
LOG.info("Created metafile " + file.getName());
}
}
return id;
}
/** Create block file and corresponding metafile in a rondom volume */
private long createBlockMetaFile() throws IOException {
List<? extends FsVolumeSpi> volumes = fds.getVolumes();
int index = rand.nextInt(volumes.size() - 1);
long id = getFreeBlockId();
File finalizedDir = volumes.get(index).getFinalizedDir(bpid);
File file = new File(finalizedDir, getBlockFile(id));
if (file.createNewFile()) {
LOG.info("Created block file " + file.getName());
// Create files with same prefix as block file but extension names
// such that during sorting, these files appear around meta file
// to test how DirectoryScanner handles extraneous files
String name1 = file.getAbsolutePath() + ".l";
String name2 = file.getAbsolutePath() + ".n";
file = new File(name1);
if (file.createNewFile()) {
LOG.info("Created extraneous file " + name1);
}
try (FsDatasetSpi.FsVolumeReferences refs = fds.getFsVolumeReferences()) {
int numVolumes = refs.size();
int index = rand.nextInt(numVolumes - 1);
file = new File(name2);
File finalizedDir = refs.get(index).getFinalizedDir(bpid);
File file = new File(finalizedDir, getBlockFile(id));
if (file.createNewFile()) {
LOG.info("Created extraneous file " + name2);
}
LOG.info("Created block file " + file.getName());
file = new File(finalizedDir, getMetaFile(id));
if (file.createNewFile()) {
LOG.info("Created metafile " + file.getName());
// Create files with same prefix as block file but extension names
// such that during sorting, these files appear around meta file
// to test how DirectoryScanner handles extraneous files
String name1 = file.getAbsolutePath() + ".l";
String name2 = file.getAbsolutePath() + ".n";
file = new File(name1);
if (file.createNewFile()) {
LOG.info("Created extraneous file " + name1);
}
file = new File(name2);
if (file.createNewFile()) {
LOG.info("Created extraneous file " + name2);
}
file = new File(finalizedDir, getMetaFile(id));
if (file.createNewFile()) {
LOG.info("Created metafile " + file.getName());
}
}
}
return id;

View File

@ -42,6 +42,7 @@
import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.util.DataChecksum;
@ -190,12 +191,15 @@ public void testLocalDirs() throws Exception {
// Check permissions on directories in 'dfs.datanode.data.dir'
FileSystem localFS = FileSystem.getLocal(conf);
for (DataNode dn : cluster.getDataNodes()) {
for (FsVolumeSpi v : dn.getFSDataset().getVolumes()) {
String dir = v.getBasePath();
Path dataDir = new Path(dir);
FsPermission actual = localFS.getFileStatus(dataDir).getPermission();
try (FsDatasetSpi.FsVolumeReferences volumes =
dn.getFSDataset().getFsVolumeReferences()) {
for (FsVolumeSpi vol : volumes) {
String dir = vol.getBasePath();
Path dataDir = new Path(dir);
FsPermission actual = localFS.getFileStatus(dataDir).getPermission();
assertEquals("Permission for dir: " + dataDir + ", is " + actual +
", while expected is " + expected, expected, actual);
}
}
}
}

View File

@ -31,6 +31,7 @@
import org.apache.hadoop.hdfs.*;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
@ -71,7 +72,10 @@ public void startCluster() throws IOException {
singletonDn = cluster.getDataNodes().get(0);
bpos = singletonDn.getAllBpOs().get(0);
actor = bpos.getBPServiceActors().get(0);
storageUuid = singletonDn.getFSDataset().getVolumes().get(0).getStorageID();
try (FsDatasetSpi.FsVolumeReferences volumes =
singletonDn.getFSDataset().getFsVolumeReferences()) {
storageUuid = volumes.get(0).getStorageID();
}
}
private static Block getDummyBlock() {

View File

@ -29,7 +29,6 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSClient;
@ -39,6 +38,7 @@
import org.apache.hadoop.hdfs.protocol.*;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
@ -142,48 +142,55 @@ public void verifyIncrementalBlockReports(boolean splitReports) throws IOExcepti
// Get the block list for the file with the block locations.
LocatedBlocks blocks = createFileGetBlocks(GenericTestUtils.getMethodName());
// We will send 'fake' incremental block reports to the NN that look
// like they originated from DN 0.
StorageReceivedDeletedBlocks reports[] =
new StorageReceivedDeletedBlocks[dn0.getFSDataset().getVolumes().size()];
try (FsDatasetSpi.FsVolumeReferences volumes
= dn0.getFSDataset().getFsVolumeReferences()) {
// We will send 'fake' incremental block reports to the NN that look
// like they originated from DN 0.
StorageReceivedDeletedBlocks reports[] =
new StorageReceivedDeletedBlocks[volumes.size()];
// Lie to the NN that one block on each storage has been deleted.
for (int i = 0; i < reports.length; ++i) {
FsVolumeSpi volume = dn0.getFSDataset().getVolumes().get(i);
// Lie to the NN that one block on each storage has been deleted.
for (int i = 0; i < reports.length; ++i) {
FsVolumeSpi volume = volumes.get(i);
boolean foundBlockOnStorage = false;
ReceivedDeletedBlockInfo rdbi[] = new ReceivedDeletedBlockInfo[1];
boolean foundBlockOnStorage = false;
ReceivedDeletedBlockInfo rdbi[] = new ReceivedDeletedBlockInfo[1];
// Find the first block on this storage and mark it as deleted for the
// report.
for (LocatedBlock block : blocks.getLocatedBlocks()) {
if (block.getStorageIDs()[0].equals(volume.getStorageID())) {
rdbi[0] = new ReceivedDeletedBlockInfo(block.getBlock().getLocalBlock(),
ReceivedDeletedBlockInfo.BlockStatus.DELETED_BLOCK, null);
foundBlockOnStorage = true;
break;
// Find the first block on this storage and mark it as deleted for the
// report.
for (LocatedBlock block : blocks.getLocatedBlocks()) {
if (block.getStorageIDs()[0].equals(volume.getStorageID())) {
rdbi[0] =
new ReceivedDeletedBlockInfo(block.getBlock().getLocalBlock(),
ReceivedDeletedBlockInfo.BlockStatus.DELETED_BLOCK, null);
foundBlockOnStorage = true;
break;
}
}
assertTrue(foundBlockOnStorage);
reports[i] =
new StorageReceivedDeletedBlocks(volume.getStorageID(), rdbi);
if (splitReports) {
// If we are splitting reports then send the report for this storage now.
StorageReceivedDeletedBlocks singletonReport[] = { reports[i] };
cluster.getNameNodeRpc().blockReceivedAndDeleted(
dn0Reg, poolId, singletonReport);
}
}
assertTrue(foundBlockOnStorage);
reports[i] = new StorageReceivedDeletedBlocks(volume.getStorageID(), rdbi);
if (splitReports) {
// If we are splitting reports then send the report for this storage now.
StorageReceivedDeletedBlocks singletonReport[] = { reports[i] };
cluster.getNameNodeRpc().blockReceivedAndDeleted(
dn0Reg, poolId, singletonReport);
if (!splitReports) {
// Send a combined report.
cluster.getNameNodeRpc()
.blockReceivedAndDeleted(dn0Reg, poolId, reports);
}
}
if (!splitReports) {
// Send a combined report.
cluster.getNameNodeRpc().blockReceivedAndDeleted(dn0Reg, poolId, reports);
// Make sure that the deleted block from each storage was picked up
// by the NameNode.
assertThat(cluster.getNamesystem().getMissingBlocksCount(),
is((long) reports.length));
}
// Make sure that the deleted block from each storage was picked up
// by the NameNode.
assertThat(cluster.getNamesystem().getMissingBlocksCount(), is((long) reports.length));
}
/**

View File

@ -33,6 +33,7 @@
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo.BlockStatus;
@ -92,8 +93,11 @@ private void testTriggerBlockReport(boolean incremental) throws Exception {
DataNode datanode = cluster.getDataNodes().get(0);
BPServiceActor actor =
datanode.getAllBpOs().get(0).getBPServiceActors().get(0);
String storageUuid =
datanode.getFSDataset().getVolumes().get(0).getStorageID();
String storageUuid;
try (FsDatasetSpi.FsVolumeReferences volumes =
datanode.getFSDataset().getFsVolumeReferences()) {
storageUuid = volumes.get(0).getStorageID();
}
actor.notifyNamenodeDeletedBlock(rdbi, storageUuid);
// Manually trigger a block report.

View File

@ -51,7 +51,7 @@ public class ExternalDatasetImpl implements FsDatasetSpi<ExternalVolumeImpl> {
StorageType.DEFAULT);
@Override
public List<ExternalVolumeImpl> getVolumes() {
public FsVolumeReferences getFsVolumeReferences() {
return null;
}

View File

@ -43,7 +43,6 @@
import java.util.Arrays;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.UUID;
@ -62,11 +61,11 @@
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.tools.JMXGet;
import org.apache.hadoop.net.unix.TemporarySocketDirectory;
@ -155,30 +154,34 @@ protected final LocatedBlocks ensureFileReplicasOnStorageType(
protected final void ensureLazyPersistBlocksAreSaved(
LocatedBlocks locatedBlocks) throws IOException, InterruptedException {
final String bpid = cluster.getNamesystem().getBlockPoolId();
List<? extends FsVolumeSpi> volumes =
cluster.getDataNodes().get(0).getFSDataset().getVolumes();
final Set<Long> persistedBlockIds = new HashSet<Long>();
while (persistedBlockIds.size() < locatedBlocks.getLocatedBlocks().size()) {
// Take 1 second sleep before each verification iteration
Thread.sleep(1000);
try (FsDatasetSpi.FsVolumeReferences volumes =
cluster.getDataNodes().get(0).getFSDataset().getFsVolumeReferences()) {
while (persistedBlockIds.size() < locatedBlocks.getLocatedBlocks()
.size()) {
// Take 1 second sleep before each verification iteration
Thread.sleep(1000);
for (LocatedBlock lb : locatedBlocks.getLocatedBlocks()) {
for (FsVolumeSpi v : volumes) {
if (v.isTransientStorage()) {
continue;
}
for (LocatedBlock lb : locatedBlocks.getLocatedBlocks()) {
for (FsVolumeSpi v : volumes) {
if (v.isTransientStorage()) {
continue;
}
FsVolumeImpl volume = (FsVolumeImpl) v;
File lazyPersistDir = volume.getBlockPoolSlice(bpid).getLazypersistDir();
FsVolumeImpl volume = (FsVolumeImpl) v;
File lazyPersistDir =
volume.getBlockPoolSlice(bpid).getLazypersistDir();
long blockId = lb.getBlock().getBlockId();
File targetDir =
DatanodeUtil.idToBlockDir(lazyPersistDir, blockId);
File blockFile = new File(targetDir, lb.getBlock().getBlockName());
if (blockFile.exists()) {
// Found a persisted copy for this block and added to the Set
persistedBlockIds.add(blockId);
long blockId = lb.getBlock().getBlockId();
File targetDir =
DatanodeUtil.idToBlockDir(lazyPersistDir, blockId);
File blockFile = new File(targetDir, lb.getBlock().getBlockName());
if (blockFile.exists()) {
// Found a persisted copy for this block and added to the Set
persistedBlockIds.add(blockId);
}
}
}
}
@ -432,18 +435,21 @@ protected final boolean verifyDeletedBlocks(LocatedBlocks locatedBlocks)
}
final String bpid = cluster.getNamesystem().getBlockPoolId();
List<? extends FsVolumeSpi> volumes =
cluster.getDataNodes().get(0).getFSDataset().getVolumes();
final FsDatasetSpi<?> dataset =
cluster.getDataNodes().get(0).getFSDataset();
// Make sure deleted replica does not have a copy on either finalized dir of
// transient volume or finalized dir of non-transient volume
for (FsVolumeSpi v : volumes) {
FsVolumeImpl volume = (FsVolumeImpl) v;
File targetDir = (v.isTransientStorage()) ?
volume.getBlockPoolSlice(bpid).getFinalizedDir() :
volume.getBlockPoolSlice(bpid).getLazypersistDir();
if (verifyBlockDeletedFromDir(targetDir, locatedBlocks) == false) {
return false;
try (FsDatasetSpi.FsVolumeReferences volumes =
dataset.getFsVolumeReferences()) {
for (FsVolumeSpi vol : volumes) {
FsVolumeImpl volume = (FsVolumeImpl) vol;
File targetDir = (volume.isTransientStorage()) ?
volume.getBlockPoolSlice(bpid).getFinalizedDir() :
volume.getBlockPoolSlice(bpid).getLazypersistDir();
if (verifyBlockDeletedFromDir(targetDir, locatedBlocks) == false) {
return false;
}
}
}
return true;

View File

@ -40,6 +40,7 @@
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil;
import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.io.IOUtils;
import org.junit.Assert;
@ -101,13 +102,18 @@ private void testRbwReplicas(MiniDFSCluster cluster, boolean isCorrupt)
out.write(writeBuf);
out.hflush();
DataNode dn = cluster.getDataNodes().get(0);
for (FsVolumeSpi v : dataset(dn).getVolumes()) {
final FsVolumeImpl volume = (FsVolumeImpl)v;
File currentDir = volume.getCurrentDir().getParentFile().getParentFile();
File rbwDir = new File(currentDir, "rbw");
for (File file : rbwDir.listFiles()) {
if (isCorrupt && Block.isBlockFilename(file)) {
new RandomAccessFile(file, "rw").setLength(fileLen-1); // corrupt
try (FsDatasetSpi.FsVolumeReferences volumes =
dataset(dn).getFsVolumeReferences()) {
for (FsVolumeSpi vol : volumes) {
final FsVolumeImpl volume = (FsVolumeImpl) vol;
File currentDir =
volume.getCurrentDir().getParentFile().getParentFile();
File rbwDir = new File(currentDir, "rbw");
for (File file : rbwDir.listFiles()) {
if (isCorrupt && Block.isBlockFilename(file)) {
new RandomAccessFile(file, "rw")
.setLength(fileLen - 1); // corrupt
}
}
}
}

View File

@ -40,6 +40,7 @@
import org.apache.hadoop.hdfs.server.datanode.ReplicaHandler;
import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.RoundRobinVolumeChoosingPolicy;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
@ -56,7 +57,6 @@
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
@ -68,7 +68,6 @@
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyList;
import static org.mockito.Matchers.anyListOf;
import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.eq;
@ -124,6 +123,15 @@ private static void createStorageDirs(DataStorage storage, Configuration conf,
when(storage.getNumStorageDirs()).thenReturn(numDirs);
}
private int getNumVolumes() {
try (FsDatasetSpi.FsVolumeReferences volumes =
dataset.getFsVolumeReferences()) {
return volumes.size();
} catch (IOException e) {
return 0;
}
}
@Before
public void setUp() throws IOException {
datanode = mock(DataNode.class);
@ -143,14 +151,14 @@ public void setUp() throws IOException {
dataset.addBlockPool(bpid, conf);
}
assertEquals(NUM_INIT_VOLUMES, dataset.getVolumes().size());
assertEquals(NUM_INIT_VOLUMES, getNumVolumes());
assertEquals(0, dataset.getNumFailedVolumes());
}
@Test
public void testAddVolumes() throws IOException {
final int numNewVolumes = 3;
final int numExistingVolumes = dataset.getVolumes().size();
final int numExistingVolumes = getNumVolumes();
final int totalVolumes = numNewVolumes + numExistingVolumes;
Set<String> expectedVolumes = new HashSet<String>();
List<NamespaceInfo> nsInfos = Lists.newArrayList();
@ -172,13 +180,15 @@ public void testAddVolumes() throws IOException {
dataset.addVolume(loc, nsInfos);
}
assertEquals(totalVolumes, dataset.getVolumes().size());
assertEquals(totalVolumes, getNumVolumes());
assertEquals(totalVolumes, dataset.storageMap.size());
Set<String> actualVolumes = new HashSet<String>();
for (int i = 0; i < numNewVolumes; i++) {
actualVolumes.add(
dataset.getVolumes().get(numExistingVolumes + i).getBasePath());
try (FsDatasetSpi.FsVolumeReferences volumes =
dataset.getFsVolumeReferences()) {
for (int i = 0; i < numNewVolumes; i++) {
actualVolumes.add(volumes.get(numExistingVolumes + i).getBasePath());
}
}
assertEquals(actualVolumes.size(), expectedVolumes.size());
assertTrue(actualVolumes.containsAll(expectedVolumes));
@ -204,7 +214,7 @@ public void testRemoveVolumes() throws IOException {
dataset.removeVolumes(volumesToRemove, true);
int expectedNumVolumes = dataDirs.length - 1;
assertEquals("The volume has been removed from the volumeList.",
expectedNumVolumes, dataset.getVolumes().size());
expectedNumVolumes, getNumVolumes());
assertEquals("The volume has been removed from the storageMap.",
expectedNumVolumes, dataset.storageMap.size());
@ -231,7 +241,7 @@ public void run() {}
@Test(timeout = 5000)
public void testRemoveNewlyAddedVolume() throws IOException {
final int numExistingVolumes = dataset.getVolumes().size();
final int numExistingVolumes = getNumVolumes();
List<NamespaceInfo> nsInfos = new ArrayList<>();
for (String bpid : BLOCK_POOL_IDS) {
nsInfos.add(new NamespaceInfo(0, CLUSTER_ID, bpid, 1));
@ -247,14 +257,14 @@ public void testRemoveNewlyAddedVolume() throws IOException {
.thenReturn(builder);
dataset.addVolume(loc, nsInfos);
assertEquals(numExistingVolumes + 1, dataset.getVolumes().size());
assertEquals(numExistingVolumes + 1, getNumVolumes());
when(storage.getNumStorageDirs()).thenReturn(numExistingVolumes + 1);
when(storage.getStorageDir(numExistingVolumes)).thenReturn(sd);
Set<File> volumesToRemove = new HashSet<>();
volumesToRemove.add(loc.getFile());
dataset.removeVolumes(volumesToRemove, true);
assertEquals(numExistingVolumes, dataset.getVolumes().size());
assertEquals(numExistingVolumes, getNumVolumes());
}
@Test(timeout = 5000)
@ -357,7 +367,10 @@ public void testDeletingBlocks() throws IOException {
DataNode dn = cluster.getDataNodes().get(0);
FsDatasetImpl ds = (FsDatasetImpl) DataNodeTestUtils.getFSDataset(dn);
FsVolumeImpl vol = ds.getVolumes().get(0);
FsVolumeImpl vol;
try (FsDatasetSpi.FsVolumeReferences volumes = ds.getFsVolumeReferences()) {
vol = (FsVolumeImpl)volumes.get(0);
}
ExtendedBlock eb;
ReplicaInfo info;

View File

@ -35,7 +35,7 @@
import java.util.List;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.fail;
import static org.junit.Assert.assertNull;
import static org.mockito.Mockito.mock;
public class TestFsVolumeList {
@ -113,11 +113,6 @@ public void testReleaseVolumeRefIfNoBlockScanner() throws IOException {
conf, StorageType.DEFAULT);
FsVolumeReference ref = volume.obtainReference();
volumeList.addVolume(ref);
try {
ref.close();
fail("Should throw exception because the reference is closed in "
+ "VolumeList#addVolume().");
} catch (IllegalStateException e) {
}
assertNull(ref.getVolume());
}
}

View File

@ -32,7 +32,8 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.*;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.Daemon;
@ -44,7 +45,6 @@
import java.io.IOException;
import java.io.OutputStream;
import java.util.List;
import java.util.Random;
import java.util.concurrent.TimeoutException;
@ -64,6 +64,7 @@ public class TestRbwSpaceReservation {
private Configuration conf;
private DistributedFileSystem fs = null;
private DFSClient client = null;
FsVolumeReference singletonVolumeRef = null;
FsVolumeImpl singletonVolume = null;
private static Random rand = new Random();
@ -104,23 +105,22 @@ private void startCluster(int blockSize, int numDatanodes, long perVolumeCapacit
cluster.waitActive();
if (perVolumeCapacity >= 0) {
for (DataNode dn : cluster.getDataNodes()) {
for (FsVolumeSpi volume : dn.getFSDataset().getVolumes()) {
((FsVolumeImpl) volume).setCapacityForTesting(perVolumeCapacity);
}
try (FsDatasetSpi.FsVolumeReferences volumes =
cluster.getDataNodes().get(0).getFSDataset().getFsVolumeReferences()) {
singletonVolumeRef = volumes.get(0).obtainReference();
}
}
if (numDatanodes == 1) {
List<? extends FsVolumeSpi> volumes =
cluster.getDataNodes().get(0).getFSDataset().getVolumes();
assertThat(volumes.size(), is(1));
singletonVolume = ((FsVolumeImpl) volumes.get(0));
singletonVolume = ((FsVolumeImpl) singletonVolumeRef.getVolume());
singletonVolume.setCapacityForTesting(perVolumeCapacity);
}
}
@After
public void shutdownCluster() throws IOException {
if (singletonVolumeRef != null) {
singletonVolumeRef.close();
singletonVolumeRef = null;
}
if (client != null) {
client.close();
client = null;
@ -266,13 +266,16 @@ public void testSpaceReleasedOnUnexpectedEof()
// Ensure all space reserved for the replica was released on each
// DataNode.
for (DataNode dn : cluster.getDataNodes()) {
final FsVolumeImpl volume = (FsVolumeImpl) dn.getFSDataset().getVolumes().get(0);
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
return (volume.getReservedForRbw() == 0);
}
}, 500, Integer.MAX_VALUE); // Wait until the test times out.
try (FsDatasetSpi.FsVolumeReferences volumes =
dn.getFSDataset().getFsVolumeReferences()) {
final FsVolumeImpl volume = (FsVolumeImpl) volumes.get(0);
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
return (volume.getReservedForRbw() == 0);
}
}, 500, Integer.MAX_VALUE); // Wait until the test times out.
}
}
}

View File

@ -45,6 +45,8 @@
import org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException;
import org.apache.hadoop.hdfs.server.datanode.ReplicaUnderRecovery;
import org.apache.hadoop.hdfs.server.datanode.ReplicaWaitingToBeRecovered;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
import org.junit.Assert;
@ -161,31 +163,37 @@ private ExtendedBlock[] setup(String bpid, FsDatasetImpl dataSet) throws IOExcep
};
ReplicaMap replicasMap = dataSet.volumeMap;
FsVolumeImpl vol = (FsVolumeImpl) dataSet.volumes
.getNextVolume(StorageType.DEFAULT, 0).getVolume();
ReplicaInfo replicaInfo = new FinalizedReplica(
blocks[FINALIZED].getLocalBlock(), vol, vol.getCurrentDir().getParentFile());
replicasMap.add(bpid, replicaInfo);
replicaInfo.getBlockFile().createNewFile();
replicaInfo.getMetaFile().createNewFile();
try (FsDatasetSpi.FsVolumeReferences references =
dataSet.getFsVolumeReferences()) {
FsVolumeImpl vol = (FsVolumeImpl) references.get(0);
ReplicaInfo replicaInfo = new FinalizedReplica(
blocks[FINALIZED].getLocalBlock(), vol,
vol.getCurrentDir().getParentFile());
replicasMap.add(bpid, replicaInfo);
replicaInfo.getBlockFile().createNewFile();
replicaInfo.getMetaFile().createNewFile();
replicasMap.add(bpid, new ReplicaInPipeline(
blocks[TEMPORARY].getBlockId(),
blocks[TEMPORARY].getGenerationStamp(), vol,
vol.createTmpFile(bpid, blocks[TEMPORARY].getLocalBlock()).getParentFile(), 0));
replicasMap.add(bpid, new ReplicaInPipeline(
blocks[TEMPORARY].getBlockId(),
blocks[TEMPORARY].getGenerationStamp(), vol,
vol.createTmpFile(bpid, blocks[TEMPORARY].getLocalBlock())
.getParentFile(), 0));
replicaInfo = new ReplicaBeingWritten(blocks[RBW].getLocalBlock(), vol,
vol.createRbwFile(bpid, blocks[RBW].getLocalBlock()).getParentFile(), null);
replicasMap.add(bpid, replicaInfo);
replicaInfo.getBlockFile().createNewFile();
replicaInfo.getMetaFile().createNewFile();
replicasMap.add(bpid, new ReplicaWaitingToBeRecovered(
blocks[RWR].getLocalBlock(), vol, vol.createRbwFile(bpid,
blocks[RWR].getLocalBlock()).getParentFile()));
replicasMap.add(bpid, new ReplicaUnderRecovery(new FinalizedReplica(blocks[RUR]
.getLocalBlock(), vol, vol.getCurrentDir().getParentFile()), 2007));
replicaInfo = new ReplicaBeingWritten(blocks[RBW].getLocalBlock(), vol,
vol.createRbwFile(bpid, blocks[RBW].getLocalBlock()).getParentFile(),
null);
replicasMap.add(bpid, replicaInfo);
replicaInfo.getBlockFile().createNewFile();
replicaInfo.getMetaFile().createNewFile();
replicasMap.add(bpid, new ReplicaWaitingToBeRecovered(
blocks[RWR].getLocalBlock(), vol, vol.createRbwFile(bpid,
blocks[RWR].getLocalBlock()).getParentFile()));
replicasMap
.add(bpid, new ReplicaUnderRecovery(new FinalizedReplica(blocks[RUR]
.getLocalBlock(), vol, vol.getCurrentDir().getParentFile()),
2007));
}
return blocks;
}
@ -538,9 +546,15 @@ public void testReplicaMapAfterDatanodeRestart() throws Exception {
getFSDataset(dn);
ReplicaMap replicaMap = dataSet.volumeMap;
List<FsVolumeImpl> volumes = dataSet.getVolumes();
// number of volumes should be 2 - [data1, data2]
assertEquals("number of volumes is wrong", 2, volumes.size());
List<FsVolumeImpl> volumes = null;
try (FsDatasetSpi.FsVolumeReferences referredVols = dataSet.getFsVolumeReferences()) {
// number of volumes should be 2 - [data1, data2]
assertEquals("number of volumes is wrong", 2, referredVols.size());
volumes = new ArrayList<>(referredVols.size());
for (FsVolumeSpi vol : referredVols) {
volumes.add((FsVolumeImpl) vol);
}
}
ArrayList<String> bpList = new ArrayList<String>(Arrays.asList(
cluster.getNamesystem(0).getBlockPoolId(),
cluster.getNamesystem(1).getBlockPoolId()));

View File

@ -56,6 +56,7 @@
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl;
import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotTestHelper;
@ -627,14 +628,18 @@ private void waitForAllReplicas(int expectedReplicaNum, Path file,
}
private void setVolumeFull(DataNode dn, StorageType type) {
List<? extends FsVolumeSpi> volumes = dn.getFSDataset().getVolumes();
for (FsVolumeSpi v : volumes) {
FsVolumeImpl volume = (FsVolumeImpl) v;
if (volume.getStorageType() == type) {
LOG.info("setCapacity to 0 for [" + volume.getStorageType() + "]"
+ volume.getStorageID());
volume.setCapacityForTesting(0);
try (FsDatasetSpi.FsVolumeReferences refs = dn.getFSDataset()
.getFsVolumeReferences()) {
for (FsVolumeSpi fvs : refs) {
FsVolumeImpl volume = (FsVolumeImpl) fvs;
if (volume.getStorageType() == type) {
LOG.info("setCapacity to 0 for [" + volume.getStorageType() + "]"
+ volume.getStorageID());
volume.setCapacityForTesting(0);
}
}
} catch (IOException e) {
LOG.error("Unexpected exception by closing FsVolumeReference", e);
}
}