HDFS-7758. Retire FsDatasetSpi#getVolumes() and use FsDatasetSpi#getVolumeRefs() instead (Lei (Eddy) Xu via Colin P. McCabe)

(cherry picked from commit 24d3a2d4fd)
This commit is contained in:
Colin Patrick Mccabe 2015-05-05 10:55:04 -07:00
parent 1f01d8347a
commit 1f6bcf94cc
28 changed files with 503 additions and 323 deletions

View File

@ -180,6 +180,9 @@ Release 2.8.0 - UNRELEASED
HDFS-8237. Move all protocol classes used by ClientProtocol to hdfs-client. HDFS-8237. Move all protocol classes used by ClientProtocol to hdfs-client.
(wheat9) (wheat9)
HDFS-7758. Retire FsDatasetSpi#getVolumes() and use
FsDatasetSpi#getVolumeRefs() instead (Lei (Eddy) Xu via Colin P. McCabe)
OPTIMIZATIONS OPTIMIZATIONS
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than

View File

@ -527,59 +527,48 @@ private void addDifference(LinkedList<ScanInfo> diffRecord,
diffRecord.add(new ScanInfo(blockId, null, null, vol)); diffRecord.add(new ScanInfo(blockId, null, null, vol));
} }
/** Is the given volume still valid in the dataset? */
private static boolean isValid(final FsDatasetSpi<?> dataset,
final FsVolumeSpi volume) {
for (FsVolumeSpi vol : dataset.getVolumes()) {
if (vol == volume) {
return true;
}
}
return false;
}
/** Get lists of blocks on the disk sorted by blockId, per blockpool */ /** Get lists of blocks on the disk sorted by blockId, per blockpool */
private Map<String, ScanInfo[]> getDiskReport() { private Map<String, ScanInfo[]> getDiskReport() {
ScanInfoPerBlockPool list = new ScanInfoPerBlockPool();
ScanInfoPerBlockPool[] dirReports = null;
// First get list of data directories // First get list of data directories
final List<? extends FsVolumeSpi> volumes = dataset.getVolumes(); try (FsDatasetSpi.FsVolumeReferences volumes =
dataset.getFsVolumeReferences()) {
// Use an array since the threads may return out of order and // Use an array since the threads may return out of order and
// compilersInProgress#keySet may return out of order as well. // compilersInProgress#keySet may return out of order as well.
ScanInfoPerBlockPool[] dirReports = new ScanInfoPerBlockPool[volumes.size()]; dirReports = new ScanInfoPerBlockPool[volumes.size()];
Map<Integer, Future<ScanInfoPerBlockPool>> compilersInProgress = Map<Integer, Future<ScanInfoPerBlockPool>> compilersInProgress =
new HashMap<Integer, Future<ScanInfoPerBlockPool>>(); new HashMap<Integer, Future<ScanInfoPerBlockPool>>();
for (int i = 0; i < volumes.size(); i++) { for (int i = 0; i < volumes.size(); i++) {
if (isValid(dataset, volumes.get(i))) {
ReportCompiler reportCompiler = ReportCompiler reportCompiler =
new ReportCompiler(datanode,volumes.get(i)); new ReportCompiler(datanode, volumes.get(i));
Future<ScanInfoPerBlockPool> result = Future<ScanInfoPerBlockPool> result =
reportCompileThreadPool.submit(reportCompiler); reportCompileThreadPool.submit(reportCompiler);
compilersInProgress.put(i, result); compilersInProgress.put(i, result);
} }
}
for (Entry<Integer, Future<ScanInfoPerBlockPool>> report : for (Entry<Integer, Future<ScanInfoPerBlockPool>> report :
compilersInProgress.entrySet()) { compilersInProgress.entrySet()) {
try { try {
dirReports[report.getKey()] = report.getValue().get(); dirReports[report.getKey()] = report.getValue().get();
} catch (Exception ex) { } catch (Exception ex) {
LOG.error("Error compiling report", ex); LOG.error("Error compiling report", ex);
// Propagate ex to DataBlockScanner to deal with // Propagate ex to DataBlockScanner to deal with
throw new RuntimeException(ex); throw new RuntimeException(ex);
}
}
} catch (IOException e) {
LOG.error("Unexpected IOException by closing FsVolumeReference", e);
}
if (dirReports != null) {
// Compile consolidated report for all the volumes
for (ScanInfoPerBlockPool report : dirReports) {
list.addAll(report);
} }
} }
// Compile consolidated report for all the volumes
ScanInfoPerBlockPool list = new ScanInfoPerBlockPool();
for (int i = 0; i < volumes.size(); i++) {
if (isValid(dataset, volumes.get(i))) {
// volume is still valid
list.addAll(dirReports[i]);
}
}
return list.toSortedArrays(); return list.toSortedArrays();
} }

View File

@ -18,13 +18,16 @@
package org.apache.hadoop.hdfs.server.datanode.fsdataset; package org.apache.hadoop.hdfs.server.datanode.fsdataset;
import java.io.Closeable;
import java.io.EOFException; import java.io.EOFException;
import java.io.File; import java.io.File;
import java.io.FileDescriptor; import java.io.FileDescriptor;
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.util.Collection; import java.nio.channels.ClosedChannelException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
@ -50,7 +53,6 @@
import org.apache.hadoop.hdfs.server.datanode.StorageLocation; import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
import org.apache.hadoop.hdfs.server.datanode.UnexpectedReplicaStateException; import org.apache.hadoop.hdfs.server.datanode.UnexpectedReplicaStateException;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetFactory; import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetFactory;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl;
import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean; import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock; import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
@ -58,7 +60,6 @@
import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo; import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
import org.apache.hadoop.hdfs.server.protocol.StorageReport; import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary; import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.ReflectionUtils;
/** /**
@ -92,8 +93,96 @@ public boolean isSimulated() {
} }
} }
/** @return a list of volumes. */ /**
public List<V> getVolumes(); * It behaviors as an unmodifiable list of FsVolume. Individual FsVolume can
* be obtained by using {@link #get(int)}.
*
* This also holds the reference counts for these volumes. It releases all the
* reference counts in {@link #close()}.
*/
class FsVolumeReferences implements Iterable<FsVolumeSpi>, Closeable {
private final List<FsVolumeReference> references;
public <S extends FsVolumeSpi> FsVolumeReferences(List<S> curVolumes) {
references = new ArrayList<>();
for (FsVolumeSpi v : curVolumes) {
try {
references.add(v.obtainReference());
} catch (ClosedChannelException e) {
// This volume has been closed.
}
}
}
private static class FsVolumeSpiIterator implements
Iterator<FsVolumeSpi> {
private final List<FsVolumeReference> references;
private int idx = 0;
FsVolumeSpiIterator(List<FsVolumeReference> refs) {
references = refs;
}
@Override
public boolean hasNext() {
return idx < references.size();
}
@Override
public FsVolumeSpi next() {
int refIdx = idx++;
return references.get(refIdx).getVolume();
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
}
@Override
public Iterator<FsVolumeSpi> iterator() {
return new FsVolumeSpiIterator(references);
}
/**
* Get the number of volumes.
*/
public int size() {
return references.size();
}
/**
* Get the volume for a given index.
*/
public FsVolumeSpi get(int index) {
return references.get(index).getVolume();
}
@Override
public void close() throws IOException {
IOException ioe = null;
for (FsVolumeReference ref : references) {
try {
ref.close();
} catch (IOException e) {
ioe = e;
}
}
references.clear();
if (ioe != null) {
throw ioe;
}
}
}
/**
* Returns a list of FsVolumes that hold reference counts.
*
* The caller must release the reference of each volume by calling
* {@link FsVolumeReferences#close()}.
*/
public FsVolumeReferences getFsVolumeReferences();
/** /**
* Add a new volume to the FsDataset.<p/> * Add a new volume to the FsDataset.<p/>

View File

@ -21,7 +21,7 @@
import java.io.IOException; import java.io.IOException;
/** /**
* This is the interface for holding reference count as AutoClosable resource. * This holds volume reference count as AutoClosable resource.
* It increases the reference count by one in the constructor, and decreases * It increases the reference count by one in the constructor, and decreases
* the reference count by one in {@link #close()}. * the reference count by one in {@link #close()}.
* *
@ -37,12 +37,15 @@
*/ */
public interface FsVolumeReference extends Closeable { public interface FsVolumeReference extends Closeable {
/** /**
* Descrese the reference count of the volume. * Decrease the reference count of the volume.
* @throws IOException it never throws IOException. * @throws IOException it never throws IOException.
*/ */
@Override @Override
public void close() throws IOException; void close() throws IOException;
/** Returns the underlying volume object */ /**
public FsVolumeSpi getVolume(); * Returns the underlying volume object. Return null if the reference was
* released.
*/
FsVolumeSpi getVolume();
} }

View File

@ -140,8 +140,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
} }
@Override // FsDatasetSpi @Override // FsDatasetSpi
public List<FsVolumeImpl> getVolumes() { public FsVolumeReferences getFsVolumeReferences() {
return volumes.getVolumes(); return new FsVolumeReferences(volumes.getVolumes());
} }
@Override @Override
@ -154,7 +154,7 @@ public StorageReport[] getStorageReports(String bpid)
throws IOException { throws IOException {
List<StorageReport> reports; List<StorageReport> reports;
synchronized (statsLock) { synchronized (statsLock) {
List<FsVolumeImpl> curVolumes = getVolumes(); List<FsVolumeImpl> curVolumes = volumes.getVolumes();
reports = new ArrayList<>(curVolumes.size()); reports = new ArrayList<>(curVolumes.size());
for (FsVolumeImpl volume : curVolumes) { for (FsVolumeImpl volume : curVolumes) {
try (FsVolumeReference ref = volume.obtainReference()) { try (FsVolumeReference ref = volume.obtainReference()) {
@ -233,7 +233,7 @@ public LengthInputStream getMetaDataInputStream(ExtendedBlock b)
final DataNode datanode; final DataNode datanode;
final DataStorage dataStorage; final DataStorage dataStorage;
final FsVolumeList volumes; private final FsVolumeList volumes;
final Map<String, DatanodeStorage> storageMap; final Map<String, DatanodeStorage> storageMap;
final FsDatasetAsyncDiskService asyncDiskService; final FsDatasetAsyncDiskService asyncDiskService;
final Daemon lazyWriter; final Daemon lazyWriter;
@ -542,7 +542,7 @@ public long getBlockPoolUsed(String bpid) throws IOException {
*/ */
@Override // FsDatasetSpi @Override // FsDatasetSpi
public boolean hasEnoughResource() { public boolean hasEnoughResource() {
return getVolumes().size() >= validVolsRequired; return volumes.getVolumes().size() >= validVolsRequired;
} }
/** /**
@ -1625,7 +1625,7 @@ public Map<DatanodeStorage, BlockListAsLongs> getBlockReports(String bpid) {
Map<String, BlockListAsLongs.Builder> builders = Map<String, BlockListAsLongs.Builder> builders =
new HashMap<String, BlockListAsLongs.Builder>(); new HashMap<String, BlockListAsLongs.Builder>();
List<FsVolumeImpl> curVolumes = getVolumes(); List<FsVolumeImpl> curVolumes = volumes.getVolumes();
for (FsVolumeSpi v : curVolumes) { for (FsVolumeSpi v : curVolumes) {
builders.put(v.getStorageID(), BlockListAsLongs.builder()); builders.put(v.getStorageID(), BlockListAsLongs.builder());
} }
@ -2535,7 +2535,7 @@ private static class VolumeInfo {
private Collection<VolumeInfo> getVolumeInfo() { private Collection<VolumeInfo> getVolumeInfo() {
Collection<VolumeInfo> info = new ArrayList<VolumeInfo>(); Collection<VolumeInfo> info = new ArrayList<VolumeInfo>();
for (FsVolumeImpl volume : getVolumes()) { for (FsVolumeImpl volume : volumes.getVolumes()) {
long used = 0; long used = 0;
long free = 0; long free = 0;
try (FsVolumeReference ref = volume.obtainReference()) { try (FsVolumeReference ref = volume.obtainReference()) {
@ -2571,7 +2571,7 @@ public Map<String, Object> getVolumeInfoMap() {
@Override //FsDatasetSpi @Override //FsDatasetSpi
public synchronized void deleteBlockPool(String bpid, boolean force) public synchronized void deleteBlockPool(String bpid, boolean force)
throws IOException { throws IOException {
List<FsVolumeImpl> curVolumes = getVolumes(); List<FsVolumeImpl> curVolumes = volumes.getVolumes();
if (!force) { if (!force) {
for (FsVolumeImpl volume : curVolumes) { for (FsVolumeImpl volume : curVolumes) {
try (FsVolumeReference ref = volume.obtainReference()) { try (FsVolumeReference ref = volume.obtainReference()) {
@ -2622,7 +2622,7 @@ public BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock block)
@Override // FsDatasetSpi @Override // FsDatasetSpi
public HdfsBlocksMetadata getHdfsBlocksMetadata(String poolId, public HdfsBlocksMetadata getHdfsBlocksMetadata(String poolId,
long[] blockIds) throws IOException { long[] blockIds) throws IOException {
List<FsVolumeImpl> curVolumes = getVolumes(); List<FsVolumeImpl> curVolumes = volumes.getVolumes();
// List of VolumeIds, one per volume on the datanode // List of VolumeIds, one per volume on the datanode
List<byte[]> blocksVolumeIds = new ArrayList<>(curVolumes.size()); List<byte[]> blocksVolumeIds = new ArrayList<>(curVolumes.size());
// List of indexes into the list of VolumeIds, pointing at the VolumeId of // List of indexes into the list of VolumeIds, pointing at the VolumeId of
@ -2730,7 +2730,7 @@ public void submitBackgroundSyncFileRangeRequest(ExtendedBlock block,
} }
private boolean ramDiskConfigured() { private boolean ramDiskConfigured() {
for (FsVolumeImpl v: getVolumes()){ for (FsVolumeImpl v: volumes.getVolumes()){
if (v.isTransientStorage()) { if (v.isTransientStorage()) {
return true; return true;
} }
@ -2742,7 +2742,7 @@ private boolean ramDiskConfigured() {
// added or removed. // added or removed.
// This should only be called when the FsDataSetImpl#volumes list is finalized. // This should only be called when the FsDataSetImpl#volumes list is finalized.
private void setupAsyncLazyPersistThreads() { private void setupAsyncLazyPersistThreads() {
for (FsVolumeImpl v: getVolumes()){ for (FsVolumeImpl v: volumes.getVolumes()){
setupAsyncLazyPersistThread(v); setupAsyncLazyPersistThread(v);
} }
} }
@ -2880,14 +2880,13 @@ private boolean transientFreeSpaceBelowThreshold() throws IOException {
// Don't worry about fragmentation for now. We don't expect more than one // Don't worry about fragmentation for now. We don't expect more than one
// transient volume per DN. // transient volume per DN.
for (FsVolumeImpl v : getVolumes()) { try (FsVolumeReferences volumes = getFsVolumeReferences()) {
try (FsVolumeReference ref = v.obtainReference()) { for (FsVolumeSpi fvs : volumes) {
FsVolumeImpl v = (FsVolumeImpl) fvs;
if (v.isTransientStorage()) { if (v.isTransientStorage()) {
capacity += v.getCapacity(); capacity += v.getCapacity();
free += v.getAvailable(); free += v.getAvailable();
} }
} catch (ClosedChannelException e) {
// ignore.
} }
} }

View File

@ -198,7 +198,7 @@ private void unreference() {
} }
private static class FsVolumeReferenceImpl implements FsVolumeReference { private static class FsVolumeReferenceImpl implements FsVolumeReference {
private final FsVolumeImpl volume; private FsVolumeImpl volume;
FsVolumeReferenceImpl(FsVolumeImpl volume) throws ClosedChannelException { FsVolumeReferenceImpl(FsVolumeImpl volume) throws ClosedChannelException {
this.volume = volume; this.volume = volume;
@ -211,7 +211,10 @@ private static class FsVolumeReferenceImpl implements FsVolumeReference {
*/ */
@Override @Override
public void close() throws IOException { public void close() throws IOException {
volume.unreference(); if (volume != null) {
volume.unreference();
volume = null;
}
} }
@Override @Override

View File

@ -276,10 +276,11 @@ public String toString() {
* @param ref a reference to the new FsVolumeImpl instance. * @param ref a reference to the new FsVolumeImpl instance.
*/ */
void addVolume(FsVolumeReference ref) { void addVolume(FsVolumeReference ref) {
FsVolumeImpl volume = (FsVolumeImpl) ref.getVolume();
while (true) { while (true) {
final FsVolumeImpl[] curVolumes = volumes.get(); final FsVolumeImpl[] curVolumes = volumes.get();
final List<FsVolumeImpl> volumeList = Lists.newArrayList(curVolumes); final List<FsVolumeImpl> volumeList = Lists.newArrayList(curVolumes);
volumeList.add((FsVolumeImpl)ref.getVolume()); volumeList.add(volume);
if (volumes.compareAndSet(curVolumes, if (volumes.compareAndSet(curVolumes,
volumeList.toArray(new FsVolumeImpl[volumeList.size()]))) { volumeList.toArray(new FsVolumeImpl[volumeList.size()]))) {
break; break;
@ -300,9 +301,9 @@ void addVolume(FsVolumeReference ref) {
} }
// If the volume is used to replace a failed volume, it needs to reset the // If the volume is used to replace a failed volume, it needs to reset the
// volume failure info for this volume. // volume failure info for this volume.
removeVolumeFailureInfo(new File(ref.getVolume().getBasePath())); removeVolumeFailureInfo(new File(volume.getBasePath()));
FsDatasetImpl.LOG.info("Added new volume: " + FsDatasetImpl.LOG.info("Added new volume: " +
ref.getVolume().getStorageID()); volume.getStorageID());
} }
/** /**

View File

@ -1494,15 +1494,20 @@ public synchronized void startDataNodes(Configuration conf, int numDataNodes,
if (storageCapacities != null) { if (storageCapacities != null) {
for (int i = curDatanodesNumSaved; i < curDatanodesNumSaved+numDataNodes; ++i) { for (int i = curDatanodesNumSaved; i < curDatanodesNumSaved+numDataNodes; ++i) {
final int index = i - curDatanodesNum; final int index = i - curDatanodesNum;
List<? extends FsVolumeSpi> volumes = dns[index].getFSDataset().getVolumes(); try (FsDatasetSpi.FsVolumeReferences volumes =
assert storageCapacities[index].length == storagesPerDatanode; dns[index].getFSDataset().getFsVolumeReferences()) {
assert volumes.size() == storagesPerDatanode; assert storageCapacities[index].length == storagesPerDatanode;
assert volumes.size() == storagesPerDatanode;
for (int j = 0; j < volumes.size(); ++j) { int j = 0;
FsVolumeImpl volume = (FsVolumeImpl) volumes.get(j); for (FsVolumeSpi fvs : volumes) {
LOG.info("setCapacityForTesting " + storageCapacities[index][j] FsVolumeImpl volume = (FsVolumeImpl) fvs;
+ " for [" + volume.getStorageType() + "]" + volume.getStorageID()); LOG.info("setCapacityForTesting " + storageCapacities[index][j]
volume.setCapacityForTesting(storageCapacities[index][j]); + " for [" + volume.getStorageType() + "]" + volume
.getStorageID());
volume.setCapacityForTesting(storageCapacities[index][j]);
j++;
}
} }
} }
} }

View File

@ -20,7 +20,6 @@
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HOST_NAME_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HOST_NAME_KEY;
import java.io.IOException; import java.io.IOException;
import java.util.List;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -31,7 +30,7 @@
import org.apache.hadoop.hdfs.server.datanode.SecureDataNodeStarter; import org.apache.hadoop.hdfs.server.datanode.SecureDataNodeStarter;
import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset; import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
import org.apache.hadoop.hdfs.server.datanode.SecureDataNodeStarter.SecureResources; import org.apache.hadoop.hdfs.server.datanode.SecureDataNodeStarter.SecureResources;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl; import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl;
import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.net.StaticMapping; import org.apache.hadoop.net.StaticMapping;
@ -195,12 +194,14 @@ public synchronized void startDataNodes(Configuration conf, int numDataNodes,
if (storageCapacities != null) { if (storageCapacities != null) {
for (int i = curDatanodesNum; i < curDatanodesNum+numDataNodes; ++i) { for (int i = curDatanodesNum; i < curDatanodesNum+numDataNodes; ++i) {
List<? extends FsVolumeSpi> volumes = dns[i].getFSDataset().getVolumes(); try (FsDatasetSpi.FsVolumeReferences volumes =
assert volumes.size() == storagesPerDatanode; dns[i].getFSDataset().getFsVolumeReferences()) {
assert volumes.size() == storagesPerDatanode;
for (int j = 0; j < volumes.size(); ++j) { for (int j = 0; j < volumes.size(); ++j) {
FsVolumeImpl volume = (FsVolumeImpl) volumes.get(j); FsVolumeImpl volume = (FsVolumeImpl) volumes.get(j);
volume.setCapacityForTesting(storageCapacities[i][j]); volume.setCapacityForTesting(storageCapacities[i][j]);
}
} }
} }
} }

View File

@ -33,6 +33,7 @@
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.StorageReport; import org.apache.hadoop.hdfs.server.protocol.StorageReport;
@ -43,9 +44,7 @@
import java.io.File; import java.io.File;
import java.io.FileOutputStream; import java.io.FileOutputStream;
import java.io.IOException; import java.io.IOException;
import java.util.Arrays;
import java.util.Iterator; import java.util.Iterator;
import java.util.List;
import static org.hamcrest.core.Is.is; import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertThat; import static org.junit.Assert.assertThat;
@ -199,15 +198,16 @@ public void testRemovingStorageDoesNotProduceZombies() throws Exception {
datanodeToRemoveStorageFromIdx++; datanodeToRemoveStorageFromIdx++;
} }
// Find the volume within the datanode which holds that first storage. // Find the volume within the datanode which holds that first storage.
List<? extends FsVolumeSpi> volumes =
datanodeToRemoveStorageFrom.getFSDataset().getVolumes();
assertEquals(NUM_STORAGES_PER_DN, volumes.size());
String volumeDirectoryToRemove = null; String volumeDirectoryToRemove = null;
for (FsVolumeSpi volume : volumes) { try (FsDatasetSpi.FsVolumeReferences volumes =
if (volume.getStorageID().equals(storageIdToRemove)) { datanodeToRemoveStorageFrom.getFSDataset().getFsVolumeReferences()) {
volumeDirectoryToRemove = volume.getBasePath(); assertEquals(NUM_STORAGES_PER_DN, volumes.size());
for (FsVolumeSpi volume : volumes) {
if (volume.getStorageID().equals(storageIdToRemove)) {
volumeDirectoryToRemove = volume.getBasePath();
}
} }
} };
// Shut down the datanode and remove the volume. // Shut down the datanode and remove the volume.
// Replace the volume directory with a regular file, which will // Replace the volume directory with a regular file, which will
// cause a volume failure. (If we merely removed the directory, // cause a volume failure. (If we merely removed the directory,

View File

@ -1270,7 +1270,7 @@ public void checkAndUpdate(String bpid, long blockId, File diskFile,
} }
@Override @Override
public List<FsVolumeSpi> getVolumes() { public FsVolumeReferences getFsVolumeReferences() {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }

View File

@ -27,8 +27,8 @@
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.*; import org.apache.hadoop.hdfs.*;
import org.apache.hadoop.hdfs.protocol.*; import org.apache.hadoop.hdfs.protocol.*;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext; import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport; import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
@ -115,11 +115,13 @@ public void testBlockHasMultipleReplicasOnSameDN() throws IOException {
blocks.add(new FinalizedReplica(localBlock, null, null)); blocks.add(new FinalizedReplica(localBlock, null, null));
} }
BlockListAsLongs bll = BlockListAsLongs.encode(blocks); try (FsDatasetSpi.FsVolumeReferences volumes =
for (int i = 0; i < cluster.getStoragesPerDatanode(); ++i) { dn.getFSDataset().getFsVolumeReferences()) {
FsVolumeSpi v = dn.getFSDataset().getVolumes().get(i); BlockListAsLongs bll = BlockListAsLongs.encode(blocks);
DatanodeStorage dns = new DatanodeStorage(v.getStorageID()); for (int i = 0; i < cluster.getStoragesPerDatanode(); ++i) {
reports[i] = new StorageBlockReport(dns, bll); DatanodeStorage dns = new DatanodeStorage(volumes.get(i).getStorageID());
reports[i] = new StorageBlockReport(dns, bll);
}
} }
// Should not assert! // Should not assert!

View File

@ -82,7 +82,7 @@ private static class TestContext implements Closeable {
final DataNode datanode; final DataNode datanode;
final BlockScanner blockScanner; final BlockScanner blockScanner;
final FsDatasetSpi<? extends FsVolumeSpi> data; final FsDatasetSpi<? extends FsVolumeSpi> data;
final List<? extends FsVolumeSpi> volumes; final FsDatasetSpi.FsVolumeReferences volumes;
TestContext(Configuration conf, int numNameServices) throws Exception { TestContext(Configuration conf, int numNameServices) throws Exception {
this.numNameServices = numNameServices; this.numNameServices = numNameServices;
@ -109,11 +109,12 @@ private static class TestContext implements Closeable {
dfs[i].mkdirs(new Path("/test")); dfs[i].mkdirs(new Path("/test"));
} }
data = datanode.getFSDataset(); data = datanode.getFSDataset();
volumes = data.getVolumes(); volumes = data.getFsVolumeReferences();
} }
@Override @Override
public void close() throws IOException { public void close() throws IOException {
volumes.close();
if (cluster != null) { if (cluster != null) {
for (int i = 0; i < numNameServices; i++) { for (int i = 0; i < numNameServices; i++) {
dfs[i].delete(new Path("/test"), true); dfs[i].delete(new Path("/test"), true);
@ -713,8 +714,7 @@ public void testMarkSuspectBlock() throws Exception {
ctx.createFiles(0, NUM_EXPECTED_BLOCKS, 1); ctx.createFiles(0, NUM_EXPECTED_BLOCKS, 1);
final TestScanResultHandler.Info info = final TestScanResultHandler.Info info =
TestScanResultHandler.getInfo(ctx.volumes.get(0)); TestScanResultHandler.getInfo(ctx.volumes.get(0));
String storageID = ctx.datanode.getFSDataset(). String storageID = ctx.volumes.get(0).getStorageID();
getVolumes().get(0).getStorageID();
synchronized (info) { synchronized (info) {
info.sem = new Semaphore(4); info.sem = new Semaphore(4);
info.shouldRun = true; info.shouldRun = true;

View File

@ -517,9 +517,12 @@ public void testAddVolumeFailures() throws IOException {
// Make sure that vol0 and vol2's metadata are not left in memory. // Make sure that vol0 and vol2's metadata are not left in memory.
FsDatasetSpi<?> dataset = dn.getFSDataset(); FsDatasetSpi<?> dataset = dn.getFSDataset();
for (FsVolumeSpi volume : dataset.getVolumes()) { try (FsDatasetSpi.FsVolumeReferences volumes =
assertThat(volume.getBasePath(), is(not(anyOf( dataset.getFsVolumeReferences()) {
is(newDirs.get(0)), is(newDirs.get(2)))))); for (FsVolumeSpi volume : volumes) {
assertThat(volume.getBasePath(), is(not(anyOf(
is(newDirs.get(0)), is(newDirs.get(2))))));
}
} }
DataStorage storage = dn.getStorage(); DataStorage storage = dn.getStorage();
for (int i = 0; i < storage.getNumStorageDirs(); i++) { for (int i = 0; i < storage.getNumStorageDirs(); i++) {
@ -688,10 +691,14 @@ public void testAddBackRemovedVolume()
} }
/** Get the FsVolume on the given basePath */ /** Get the FsVolume on the given basePath */
private FsVolumeImpl getVolume(DataNode dn, File basePath) { private FsVolumeImpl getVolume(DataNode dn, File basePath)
for (FsVolumeSpi vol : dn.getFSDataset().getVolumes()) { throws IOException {
if (vol.getBasePath().equals(basePath.getPath())) { try (FsDatasetSpi.FsVolumeReferences volumes =
return (FsVolumeImpl)vol; dn.getFSDataset().getFsVolumeReferences()) {
for (FsVolumeSpi vol : volumes) {
if (vol.getBasePath().equals(basePath.getPath())) {
return (FsVolumeImpl) vol;
}
} }
} }
return null; return null;

View File

@ -63,7 +63,6 @@
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetTestUtil; import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetTestUtil;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
@ -249,9 +248,11 @@ public void testFailedVolumeBeingRemovedFromDataNode()
// 2. dn0Vol1 is removed from FsDataset // 2. dn0Vol1 is removed from FsDataset
FsDatasetSpi<? extends FsVolumeSpi> data = dn0.getFSDataset(); FsDatasetSpi<? extends FsVolumeSpi> data = dn0.getFSDataset();
for (FsVolumeSpi volume : data.getVolumes()) { try (FsDatasetSpi.FsVolumeReferences vols = data.getFsVolumeReferences()) {
assertNotEquals(new File(volume.getBasePath()).getAbsoluteFile(), for (FsVolumeSpi volume : vols) {
dn0Vol1.getAbsoluteFile()); assertNotEquals(new File(volume.getBasePath()).getAbsoluteFile(),
dn0Vol1.getAbsoluteFile());
}
} }
// 3. all blocks on dn0Vol1 have been removed. // 3. all blocks on dn0Vol1 have been removed.

View File

@ -157,30 +157,37 @@ private long deleteMetaFile() {
private void duplicateBlock(long blockId) throws IOException { private void duplicateBlock(long blockId) throws IOException {
synchronized (fds) { synchronized (fds) {
ReplicaInfo b = FsDatasetTestUtil.fetchReplicaInfo(fds, bpid, blockId); ReplicaInfo b = FsDatasetTestUtil.fetchReplicaInfo(fds, bpid, blockId);
for (FsVolumeSpi v : fds.getVolumes()) { try (FsDatasetSpi.FsVolumeReferences volumes =
if (v.getStorageID().equals(b.getVolume().getStorageID())) { fds.getFsVolumeReferences()) {
continue; for (FsVolumeSpi v : volumes) {
} if (v.getStorageID().equals(b.getVolume().getStorageID())) {
continue;
}
// Volume without a copy of the block. Make a copy now. // Volume without a copy of the block. Make a copy now.
File sourceBlock = b.getBlockFile(); File sourceBlock = b.getBlockFile();
File sourceMeta = b.getMetaFile(); File sourceMeta = b.getMetaFile();
String sourceRoot = b.getVolume().getBasePath(); String sourceRoot = b.getVolume().getBasePath();
String destRoot = v.getBasePath(); String destRoot = v.getBasePath();
String relativeBlockPath = new File(sourceRoot).toURI().relativize(sourceBlock.toURI()).getPath(); String relativeBlockPath =
String relativeMetaPath = new File(sourceRoot).toURI().relativize(sourceMeta.toURI()).getPath(); new File(sourceRoot).toURI().relativize(sourceBlock.toURI())
.getPath();
String relativeMetaPath =
new File(sourceRoot).toURI().relativize(sourceMeta.toURI())
.getPath();
File destBlock = new File(destRoot, relativeBlockPath); File destBlock = new File(destRoot, relativeBlockPath);
File destMeta = new File(destRoot, relativeMetaPath); File destMeta = new File(destRoot, relativeMetaPath);
destBlock.getParentFile().mkdirs(); destBlock.getParentFile().mkdirs();
FileUtils.copyFile(sourceBlock, destBlock); FileUtils.copyFile(sourceBlock, destBlock);
FileUtils.copyFile(sourceMeta, destMeta); FileUtils.copyFile(sourceMeta, destMeta);
if (destBlock.exists() && destMeta.exists()) { if (destBlock.exists() && destMeta.exists()) {
LOG.info("Copied " + sourceBlock + " ==> " + destBlock); LOG.info("Copied " + sourceBlock + " ==> " + destBlock);
LOG.info("Copied " + sourceMeta + " ==> " + destMeta); LOG.info("Copied " + sourceMeta + " ==> " + destMeta);
}
} }
} }
} }
@ -209,58 +216,67 @@ private String getMetaFile(long id) {
/** Create a block file in a random volume*/ /** Create a block file in a random volume*/
private long createBlockFile() throws IOException { private long createBlockFile() throws IOException {
List<? extends FsVolumeSpi> volumes = fds.getVolumes();
int index = rand.nextInt(volumes.size() - 1);
long id = getFreeBlockId(); long id = getFreeBlockId();
File finalizedDir = volumes.get(index).getFinalizedDir(bpid); try (FsDatasetSpi.FsVolumeReferences volumes = fds.getFsVolumeReferences()) {
File file = new File(finalizedDir, getBlockFile(id)); int numVolumes = volumes.size();
if (file.createNewFile()) { int index = rand.nextInt(numVolumes - 1);
LOG.info("Created block file " + file.getName()); File finalizedDir = volumes.get(index).getFinalizedDir(bpid);
File file = new File(finalizedDir, getBlockFile(id));
if (file.createNewFile()) {
LOG.info("Created block file " + file.getName());
}
} }
return id; return id;
} }
/** Create a metafile in a random volume*/ /** Create a metafile in a random volume*/
private long createMetaFile() throws IOException { private long createMetaFile() throws IOException {
List<? extends FsVolumeSpi> volumes = fds.getVolumes();
int index = rand.nextInt(volumes.size() - 1);
long id = getFreeBlockId(); long id = getFreeBlockId();
File finalizedDir = volumes.get(index).getFinalizedDir(bpid); try (FsDatasetSpi.FsVolumeReferences refs = fds.getFsVolumeReferences()) {
File file = new File(finalizedDir, getMetaFile(id)); int numVolumes = refs.size();
if (file.createNewFile()) { int index = rand.nextInt(numVolumes - 1);
LOG.info("Created metafile " + file.getName());
File finalizedDir = refs.get(index).getFinalizedDir(bpid);
File file = new File(finalizedDir, getMetaFile(id));
if (file.createNewFile()) {
LOG.info("Created metafile " + file.getName());
}
} }
return id; return id;
} }
/** Create block file and corresponding metafile in a rondom volume */ /** Create block file and corresponding metafile in a rondom volume */
private long createBlockMetaFile() throws IOException { private long createBlockMetaFile() throws IOException {
List<? extends FsVolumeSpi> volumes = fds.getVolumes();
int index = rand.nextInt(volumes.size() - 1);
long id = getFreeBlockId(); long id = getFreeBlockId();
File finalizedDir = volumes.get(index).getFinalizedDir(bpid);
File file = new File(finalizedDir, getBlockFile(id));
if (file.createNewFile()) {
LOG.info("Created block file " + file.getName());
// Create files with same prefix as block file but extension names try (FsDatasetSpi.FsVolumeReferences refs = fds.getFsVolumeReferences()) {
// such that during sorting, these files appear around meta file int numVolumes = refs.size();
// to test how DirectoryScanner handles extraneous files int index = rand.nextInt(numVolumes - 1);
String name1 = file.getAbsolutePath() + ".l";
String name2 = file.getAbsolutePath() + ".n";
file = new File(name1);
if (file.createNewFile()) {
LOG.info("Created extraneous file " + name1);
}
file = new File(name2); File finalizedDir = refs.get(index).getFinalizedDir(bpid);
File file = new File(finalizedDir, getBlockFile(id));
if (file.createNewFile()) { if (file.createNewFile()) {
LOG.info("Created extraneous file " + name2); LOG.info("Created block file " + file.getName());
}
file = new File(finalizedDir, getMetaFile(id)); // Create files with same prefix as block file but extension names
if (file.createNewFile()) { // such that during sorting, these files appear around meta file
LOG.info("Created metafile " + file.getName()); // to test how DirectoryScanner handles extraneous files
String name1 = file.getAbsolutePath() + ".l";
String name2 = file.getAbsolutePath() + ".n";
file = new File(name1);
if (file.createNewFile()) {
LOG.info("Created extraneous file " + name1);
}
file = new File(name2);
if (file.createNewFile()) {
LOG.info("Created extraneous file " + name2);
}
file = new File(finalizedDir, getMetaFile(id));
if (file.createNewFile()) {
LOG.info("Created metafile " + file.getName());
}
} }
} }
return id; return id;

View File

@ -42,6 +42,7 @@
import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage; import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
import org.apache.hadoop.hdfs.protocol.datatransfer.Sender; import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager; import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter; import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.DataChecksum;
@ -190,12 +191,15 @@ public void testLocalDirs() throws Exception {
// Check permissions on directories in 'dfs.datanode.data.dir' // Check permissions on directories in 'dfs.datanode.data.dir'
FileSystem localFS = FileSystem.getLocal(conf); FileSystem localFS = FileSystem.getLocal(conf);
for (DataNode dn : cluster.getDataNodes()) { for (DataNode dn : cluster.getDataNodes()) {
for (FsVolumeSpi v : dn.getFSDataset().getVolumes()) { try (FsDatasetSpi.FsVolumeReferences volumes =
String dir = v.getBasePath(); dn.getFSDataset().getFsVolumeReferences()) {
Path dataDir = new Path(dir); for (FsVolumeSpi vol : volumes) {
FsPermission actual = localFS.getFileStatus(dataDir).getPermission(); String dir = vol.getBasePath();
Path dataDir = new Path(dir);
FsPermission actual = localFS.getFileStatus(dataDir).getPermission();
assertEquals("Permission for dir: " + dataDir + ", is " + actual + assertEquals("Permission for dir: " + dataDir + ", is " + actual +
", while expected is " + expected, expected, actual); ", while expected is " + expected, expected, actual);
}
} }
} }
} }

View File

@ -31,6 +31,7 @@
import org.apache.hadoop.hdfs.*; import org.apache.hadoop.hdfs.*;
import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB; import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo; import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
@ -71,7 +72,10 @@ public void startCluster() throws IOException {
singletonDn = cluster.getDataNodes().get(0); singletonDn = cluster.getDataNodes().get(0);
bpos = singletonDn.getAllBpOs().get(0); bpos = singletonDn.getAllBpOs().get(0);
actor = bpos.getBPServiceActors().get(0); actor = bpos.getBPServiceActors().get(0);
storageUuid = singletonDn.getFSDataset().getVolumes().get(0).getStorageID(); try (FsDatasetSpi.FsVolumeReferences volumes =
singletonDn.getFSDataset().getFsVolumeReferences()) {
storageUuid = volumes.get(0).getStorageID();
}
} }
private static Block getDummyBlock() { private static Block getDummyBlock() {

View File

@ -29,7 +29,6 @@
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSClient; import org.apache.hadoop.hdfs.DFSClient;
@ -39,6 +38,7 @@
import org.apache.hadoop.hdfs.protocol.*; import org.apache.hadoop.hdfs.protocol.*;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.NameNode;
@ -142,48 +142,55 @@ public void verifyIncrementalBlockReports(boolean splitReports) throws IOExcepti
// Get the block list for the file with the block locations. // Get the block list for the file with the block locations.
LocatedBlocks blocks = createFileGetBlocks(GenericTestUtils.getMethodName()); LocatedBlocks blocks = createFileGetBlocks(GenericTestUtils.getMethodName());
// We will send 'fake' incremental block reports to the NN that look try (FsDatasetSpi.FsVolumeReferences volumes
// like they originated from DN 0. = dn0.getFSDataset().getFsVolumeReferences()) {
StorageReceivedDeletedBlocks reports[] = // We will send 'fake' incremental block reports to the NN that look
new StorageReceivedDeletedBlocks[dn0.getFSDataset().getVolumes().size()]; // like they originated from DN 0.
StorageReceivedDeletedBlocks reports[] =
new StorageReceivedDeletedBlocks[volumes.size()];
// Lie to the NN that one block on each storage has been deleted. // Lie to the NN that one block on each storage has been deleted.
for (int i = 0; i < reports.length; ++i) { for (int i = 0; i < reports.length; ++i) {
FsVolumeSpi volume = dn0.getFSDataset().getVolumes().get(i); FsVolumeSpi volume = volumes.get(i);
boolean foundBlockOnStorage = false; boolean foundBlockOnStorage = false;
ReceivedDeletedBlockInfo rdbi[] = new ReceivedDeletedBlockInfo[1]; ReceivedDeletedBlockInfo rdbi[] = new ReceivedDeletedBlockInfo[1];
// Find the first block on this storage and mark it as deleted for the // Find the first block on this storage and mark it as deleted for the
// report. // report.
for (LocatedBlock block : blocks.getLocatedBlocks()) { for (LocatedBlock block : blocks.getLocatedBlocks()) {
if (block.getStorageIDs()[0].equals(volume.getStorageID())) { if (block.getStorageIDs()[0].equals(volume.getStorageID())) {
rdbi[0] = new ReceivedDeletedBlockInfo(block.getBlock().getLocalBlock(), rdbi[0] =
ReceivedDeletedBlockInfo.BlockStatus.DELETED_BLOCK, null); new ReceivedDeletedBlockInfo(block.getBlock().getLocalBlock(),
foundBlockOnStorage = true; ReceivedDeletedBlockInfo.BlockStatus.DELETED_BLOCK, null);
break; foundBlockOnStorage = true;
break;
}
}
assertTrue(foundBlockOnStorage);
reports[i] =
new StorageReceivedDeletedBlocks(volume.getStorageID(), rdbi);
if (splitReports) {
// If we are splitting reports then send the report for this storage now.
StorageReceivedDeletedBlocks singletonReport[] = { reports[i] };
cluster.getNameNodeRpc().blockReceivedAndDeleted(
dn0Reg, poolId, singletonReport);
} }
} }
assertTrue(foundBlockOnStorage); if (!splitReports) {
reports[i] = new StorageReceivedDeletedBlocks(volume.getStorageID(), rdbi); // Send a combined report.
cluster.getNameNodeRpc()
if (splitReports) { .blockReceivedAndDeleted(dn0Reg, poolId, reports);
// If we are splitting reports then send the report for this storage now.
StorageReceivedDeletedBlocks singletonReport[] = { reports[i] };
cluster.getNameNodeRpc().blockReceivedAndDeleted(
dn0Reg, poolId, singletonReport);
} }
}
if (!splitReports) { // Make sure that the deleted block from each storage was picked up
// Send a combined report. // by the NameNode.
cluster.getNameNodeRpc().blockReceivedAndDeleted(dn0Reg, poolId, reports); assertThat(cluster.getNamesystem().getMissingBlocksCount(),
is((long) reports.length));
} }
// Make sure that the deleted block from each storage was picked up
// by the NameNode.
assertThat(cluster.getNamesystem().getMissingBlocksCount(), is((long) reports.length));
} }
/** /**

View File

@ -33,6 +33,7 @@
import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB; import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext; import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo; import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo.BlockStatus; import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo.BlockStatus;
@ -92,8 +93,11 @@ private void testTriggerBlockReport(boolean incremental) throws Exception {
DataNode datanode = cluster.getDataNodes().get(0); DataNode datanode = cluster.getDataNodes().get(0);
BPServiceActor actor = BPServiceActor actor =
datanode.getAllBpOs().get(0).getBPServiceActors().get(0); datanode.getAllBpOs().get(0).getBPServiceActors().get(0);
String storageUuid = String storageUuid;
datanode.getFSDataset().getVolumes().get(0).getStorageID(); try (FsDatasetSpi.FsVolumeReferences volumes =
datanode.getFSDataset().getFsVolumeReferences()) {
storageUuid = volumes.get(0).getStorageID();
}
actor.notifyNamenodeDeletedBlock(rdbi, storageUuid); actor.notifyNamenodeDeletedBlock(rdbi, storageUuid);
// Manually trigger a block report. // Manually trigger a block report.

View File

@ -51,7 +51,7 @@ public class ExternalDatasetImpl implements FsDatasetSpi<ExternalVolumeImpl> {
StorageType.DEFAULT); StorageType.DEFAULT);
@Override @Override
public List<ExternalVolumeImpl> getVolumes() { public FsVolumeReferences getFsVolumeReferences() {
return null; return null;
} }

View File

@ -43,7 +43,6 @@
import java.util.Arrays; import java.util.Arrays;
import java.util.EnumSet; import java.util.EnumSet;
import java.util.HashSet; import java.util.HashSet;
import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.UUID; import java.util.UUID;
@ -62,11 +61,11 @@
import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil; import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.tools.JMXGet; import org.apache.hadoop.hdfs.tools.JMXGet;
import org.apache.hadoop.net.unix.TemporarySocketDirectory; import org.apache.hadoop.net.unix.TemporarySocketDirectory;
@ -155,30 +154,34 @@ protected final LocatedBlocks ensureFileReplicasOnStorageType(
protected final void ensureLazyPersistBlocksAreSaved( protected final void ensureLazyPersistBlocksAreSaved(
LocatedBlocks locatedBlocks) throws IOException, InterruptedException { LocatedBlocks locatedBlocks) throws IOException, InterruptedException {
final String bpid = cluster.getNamesystem().getBlockPoolId(); final String bpid = cluster.getNamesystem().getBlockPoolId();
List<? extends FsVolumeSpi> volumes =
cluster.getDataNodes().get(0).getFSDataset().getVolumes();
final Set<Long> persistedBlockIds = new HashSet<Long>(); final Set<Long> persistedBlockIds = new HashSet<Long>();
while (persistedBlockIds.size() < locatedBlocks.getLocatedBlocks().size()) { try (FsDatasetSpi.FsVolumeReferences volumes =
// Take 1 second sleep before each verification iteration cluster.getDataNodes().get(0).getFSDataset().getFsVolumeReferences()) {
Thread.sleep(1000); while (persistedBlockIds.size() < locatedBlocks.getLocatedBlocks()
.size()) {
// Take 1 second sleep before each verification iteration
Thread.sleep(1000);
for (LocatedBlock lb : locatedBlocks.getLocatedBlocks()) { for (LocatedBlock lb : locatedBlocks.getLocatedBlocks()) {
for (FsVolumeSpi v : volumes) { for (FsVolumeSpi v : volumes) {
if (v.isTransientStorage()) { if (v.isTransientStorage()) {
continue; continue;
} }
FsVolumeImpl volume = (FsVolumeImpl) v; FsVolumeImpl volume = (FsVolumeImpl) v;
File lazyPersistDir = volume.getBlockPoolSlice(bpid).getLazypersistDir(); File lazyPersistDir =
volume.getBlockPoolSlice(bpid).getLazypersistDir();
long blockId = lb.getBlock().getBlockId(); long blockId = lb.getBlock().getBlockId();
File targetDir = File targetDir =
DatanodeUtil.idToBlockDir(lazyPersistDir, blockId); DatanodeUtil.idToBlockDir(lazyPersistDir, blockId);
File blockFile = new File(targetDir, lb.getBlock().getBlockName()); File blockFile = new File(targetDir, lb.getBlock().getBlockName());
if (blockFile.exists()) { if (blockFile.exists()) {
// Found a persisted copy for this block and added to the Set // Found a persisted copy for this block and added to the Set
persistedBlockIds.add(blockId); persistedBlockIds.add(blockId);
}
} }
} }
} }
@ -432,18 +435,21 @@ protected final boolean verifyDeletedBlocks(LocatedBlocks locatedBlocks)
} }
final String bpid = cluster.getNamesystem().getBlockPoolId(); final String bpid = cluster.getNamesystem().getBlockPoolId();
List<? extends FsVolumeSpi> volumes = final FsDatasetSpi<?> dataset =
cluster.getDataNodes().get(0).getFSDataset().getVolumes(); cluster.getDataNodes().get(0).getFSDataset();
// Make sure deleted replica does not have a copy on either finalized dir of // Make sure deleted replica does not have a copy on either finalized dir of
// transient volume or finalized dir of non-transient volume // transient volume or finalized dir of non-transient volume
for (FsVolumeSpi v : volumes) { try (FsDatasetSpi.FsVolumeReferences volumes =
FsVolumeImpl volume = (FsVolumeImpl) v; dataset.getFsVolumeReferences()) {
File targetDir = (v.isTransientStorage()) ? for (FsVolumeSpi vol : volumes) {
volume.getBlockPoolSlice(bpid).getFinalizedDir() : FsVolumeImpl volume = (FsVolumeImpl) vol;
volume.getBlockPoolSlice(bpid).getLazypersistDir(); File targetDir = (volume.isTransientStorage()) ?
if (verifyBlockDeletedFromDir(targetDir, locatedBlocks) == false) { volume.getBlockPoolSlice(bpid).getFinalizedDir() :
return false; volume.getBlockPoolSlice(bpid).getLazypersistDir();
if (verifyBlockDeletedFromDir(targetDir, locatedBlocks) == false) {
return false;
}
} }
} }
return true; return true;

View File

@ -40,6 +40,7 @@
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil; import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil;
import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo; import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.junit.Assert; import org.junit.Assert;
@ -101,13 +102,18 @@ private void testRbwReplicas(MiniDFSCluster cluster, boolean isCorrupt)
out.write(writeBuf); out.write(writeBuf);
out.hflush(); out.hflush();
DataNode dn = cluster.getDataNodes().get(0); DataNode dn = cluster.getDataNodes().get(0);
for (FsVolumeSpi v : dataset(dn).getVolumes()) { try (FsDatasetSpi.FsVolumeReferences volumes =
final FsVolumeImpl volume = (FsVolumeImpl)v; dataset(dn).getFsVolumeReferences()) {
File currentDir = volume.getCurrentDir().getParentFile().getParentFile(); for (FsVolumeSpi vol : volumes) {
File rbwDir = new File(currentDir, "rbw"); final FsVolumeImpl volume = (FsVolumeImpl) vol;
for (File file : rbwDir.listFiles()) { File currentDir =
if (isCorrupt && Block.isBlockFilename(file)) { volume.getCurrentDir().getParentFile().getParentFile();
new RandomAccessFile(file, "rw").setLength(fileLen-1); // corrupt File rbwDir = new File(currentDir, "rbw");
for (File file : rbwDir.listFiles()) {
if (isCorrupt && Block.isBlockFilename(file)) {
new RandomAccessFile(file, "rw")
.setLength(fileLen - 1); // corrupt
}
} }
} }
} }

View File

@ -40,6 +40,7 @@
import org.apache.hadoop.hdfs.server.datanode.ReplicaHandler; import org.apache.hadoop.hdfs.server.datanode.ReplicaHandler;
import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo; import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
import org.apache.hadoop.hdfs.server.datanode.StorageLocation; import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.RoundRobinVolumeChoosingPolicy; import org.apache.hadoop.hdfs.server.datanode.fsdataset.RoundRobinVolumeChoosingPolicy;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
@ -56,7 +57,6 @@
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
@ -68,7 +68,6 @@
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
import static org.mockito.Matchers.any; import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyList;
import static org.mockito.Matchers.anyListOf; import static org.mockito.Matchers.anyListOf;
import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.eq; import static org.mockito.Matchers.eq;
@ -124,6 +123,15 @@ private static void createStorageDirs(DataStorage storage, Configuration conf,
when(storage.getNumStorageDirs()).thenReturn(numDirs); when(storage.getNumStorageDirs()).thenReturn(numDirs);
} }
private int getNumVolumes() {
try (FsDatasetSpi.FsVolumeReferences volumes =
dataset.getFsVolumeReferences()) {
return volumes.size();
} catch (IOException e) {
return 0;
}
}
@Before @Before
public void setUp() throws IOException { public void setUp() throws IOException {
datanode = mock(DataNode.class); datanode = mock(DataNode.class);
@ -143,14 +151,14 @@ public void setUp() throws IOException {
dataset.addBlockPool(bpid, conf); dataset.addBlockPool(bpid, conf);
} }
assertEquals(NUM_INIT_VOLUMES, dataset.getVolumes().size()); assertEquals(NUM_INIT_VOLUMES, getNumVolumes());
assertEquals(0, dataset.getNumFailedVolumes()); assertEquals(0, dataset.getNumFailedVolumes());
} }
@Test @Test
public void testAddVolumes() throws IOException { public void testAddVolumes() throws IOException {
final int numNewVolumes = 3; final int numNewVolumes = 3;
final int numExistingVolumes = dataset.getVolumes().size(); final int numExistingVolumes = getNumVolumes();
final int totalVolumes = numNewVolumes + numExistingVolumes; final int totalVolumes = numNewVolumes + numExistingVolumes;
Set<String> expectedVolumes = new HashSet<String>(); Set<String> expectedVolumes = new HashSet<String>();
List<NamespaceInfo> nsInfos = Lists.newArrayList(); List<NamespaceInfo> nsInfos = Lists.newArrayList();
@ -172,13 +180,15 @@ public void testAddVolumes() throws IOException {
dataset.addVolume(loc, nsInfos); dataset.addVolume(loc, nsInfos);
} }
assertEquals(totalVolumes, dataset.getVolumes().size()); assertEquals(totalVolumes, getNumVolumes());
assertEquals(totalVolumes, dataset.storageMap.size()); assertEquals(totalVolumes, dataset.storageMap.size());
Set<String> actualVolumes = new HashSet<String>(); Set<String> actualVolumes = new HashSet<String>();
for (int i = 0; i < numNewVolumes; i++) { try (FsDatasetSpi.FsVolumeReferences volumes =
actualVolumes.add( dataset.getFsVolumeReferences()) {
dataset.getVolumes().get(numExistingVolumes + i).getBasePath()); for (int i = 0; i < numNewVolumes; i++) {
actualVolumes.add(volumes.get(numExistingVolumes + i).getBasePath());
}
} }
assertEquals(actualVolumes.size(), expectedVolumes.size()); assertEquals(actualVolumes.size(), expectedVolumes.size());
assertTrue(actualVolumes.containsAll(expectedVolumes)); assertTrue(actualVolumes.containsAll(expectedVolumes));
@ -204,7 +214,7 @@ public void testRemoveVolumes() throws IOException {
dataset.removeVolumes(volumesToRemove, true); dataset.removeVolumes(volumesToRemove, true);
int expectedNumVolumes = dataDirs.length - 1; int expectedNumVolumes = dataDirs.length - 1;
assertEquals("The volume has been removed from the volumeList.", assertEquals("The volume has been removed from the volumeList.",
expectedNumVolumes, dataset.getVolumes().size()); expectedNumVolumes, getNumVolumes());
assertEquals("The volume has been removed from the storageMap.", assertEquals("The volume has been removed from the storageMap.",
expectedNumVolumes, dataset.storageMap.size()); expectedNumVolumes, dataset.storageMap.size());
@ -231,7 +241,7 @@ public void run() {}
@Test(timeout = 5000) @Test(timeout = 5000)
public void testRemoveNewlyAddedVolume() throws IOException { public void testRemoveNewlyAddedVolume() throws IOException {
final int numExistingVolumes = dataset.getVolumes().size(); final int numExistingVolumes = getNumVolumes();
List<NamespaceInfo> nsInfos = new ArrayList<>(); List<NamespaceInfo> nsInfos = new ArrayList<>();
for (String bpid : BLOCK_POOL_IDS) { for (String bpid : BLOCK_POOL_IDS) {
nsInfos.add(new NamespaceInfo(0, CLUSTER_ID, bpid, 1)); nsInfos.add(new NamespaceInfo(0, CLUSTER_ID, bpid, 1));
@ -247,14 +257,14 @@ public void testRemoveNewlyAddedVolume() throws IOException {
.thenReturn(builder); .thenReturn(builder);
dataset.addVolume(loc, nsInfos); dataset.addVolume(loc, nsInfos);
assertEquals(numExistingVolumes + 1, dataset.getVolumes().size()); assertEquals(numExistingVolumes + 1, getNumVolumes());
when(storage.getNumStorageDirs()).thenReturn(numExistingVolumes + 1); when(storage.getNumStorageDirs()).thenReturn(numExistingVolumes + 1);
when(storage.getStorageDir(numExistingVolumes)).thenReturn(sd); when(storage.getStorageDir(numExistingVolumes)).thenReturn(sd);
Set<File> volumesToRemove = new HashSet<>(); Set<File> volumesToRemove = new HashSet<>();
volumesToRemove.add(loc.getFile()); volumesToRemove.add(loc.getFile());
dataset.removeVolumes(volumesToRemove, true); dataset.removeVolumes(volumesToRemove, true);
assertEquals(numExistingVolumes, dataset.getVolumes().size()); assertEquals(numExistingVolumes, getNumVolumes());
} }
@Test(timeout = 5000) @Test(timeout = 5000)
@ -357,7 +367,10 @@ public void testDeletingBlocks() throws IOException {
DataNode dn = cluster.getDataNodes().get(0); DataNode dn = cluster.getDataNodes().get(0);
FsDatasetImpl ds = (FsDatasetImpl) DataNodeTestUtils.getFSDataset(dn); FsDatasetImpl ds = (FsDatasetImpl) DataNodeTestUtils.getFSDataset(dn);
FsVolumeImpl vol = ds.getVolumes().get(0); FsVolumeImpl vol;
try (FsDatasetSpi.FsVolumeReferences volumes = ds.getFsVolumeReferences()) {
vol = (FsVolumeImpl)volumes.get(0);
}
ExtendedBlock eb; ExtendedBlock eb;
ReplicaInfo info; ReplicaInfo info;

View File

@ -35,7 +35,7 @@
import java.util.List; import java.util.List;
import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.fail; import static org.junit.Assert.assertNull;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
public class TestFsVolumeList { public class TestFsVolumeList {
@ -113,11 +113,6 @@ public void testReleaseVolumeRefIfNoBlockScanner() throws IOException {
conf, StorageType.DEFAULT); conf, StorageType.DEFAULT);
FsVolumeReference ref = volume.obtainReference(); FsVolumeReference ref = volume.obtainReference();
volumeList.addVolume(ref); volumeList.addVolume(ref);
try { assertNull(ref.getVolume());
ref.close();
fail("Should throw exception because the reference is closed in "
+ "VolumeList#addVolume().");
} catch (IllegalStateException e) {
}
} }
} }

View File

@ -32,7 +32,8 @@
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.*; import org.apache.hadoop.hdfs.*;
import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.Daemon; import org.apache.hadoop.util.Daemon;
@ -44,7 +45,6 @@
import java.io.IOException; import java.io.IOException;
import java.io.OutputStream; import java.io.OutputStream;
import java.util.List;
import java.util.Random; import java.util.Random;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
@ -64,6 +64,7 @@ public class TestRbwSpaceReservation {
private Configuration conf; private Configuration conf;
private DistributedFileSystem fs = null; private DistributedFileSystem fs = null;
private DFSClient client = null; private DFSClient client = null;
FsVolumeReference singletonVolumeRef = null;
FsVolumeImpl singletonVolume = null; FsVolumeImpl singletonVolume = null;
private static Random rand = new Random(); private static Random rand = new Random();
@ -104,23 +105,22 @@ private void startCluster(int blockSize, int numDatanodes, long perVolumeCapacit
cluster.waitActive(); cluster.waitActive();
if (perVolumeCapacity >= 0) { if (perVolumeCapacity >= 0) {
for (DataNode dn : cluster.getDataNodes()) { try (FsDatasetSpi.FsVolumeReferences volumes =
for (FsVolumeSpi volume : dn.getFSDataset().getVolumes()) { cluster.getDataNodes().get(0).getFSDataset().getFsVolumeReferences()) {
((FsVolumeImpl) volume).setCapacityForTesting(perVolumeCapacity); singletonVolumeRef = volumes.get(0).obtainReference();
}
} }
} singletonVolume = ((FsVolumeImpl) singletonVolumeRef.getVolume());
singletonVolume.setCapacityForTesting(perVolumeCapacity);
if (numDatanodes == 1) {
List<? extends FsVolumeSpi> volumes =
cluster.getDataNodes().get(0).getFSDataset().getVolumes();
assertThat(volumes.size(), is(1));
singletonVolume = ((FsVolumeImpl) volumes.get(0));
} }
} }
@After @After
public void shutdownCluster() throws IOException { public void shutdownCluster() throws IOException {
if (singletonVolumeRef != null) {
singletonVolumeRef.close();
singletonVolumeRef = null;
}
if (client != null) { if (client != null) {
client.close(); client.close();
client = null; client = null;
@ -266,13 +266,16 @@ public void testSpaceReleasedOnUnexpectedEof()
// Ensure all space reserved for the replica was released on each // Ensure all space reserved for the replica was released on each
// DataNode. // DataNode.
for (DataNode dn : cluster.getDataNodes()) { for (DataNode dn : cluster.getDataNodes()) {
final FsVolumeImpl volume = (FsVolumeImpl) dn.getFSDataset().getVolumes().get(0); try (FsDatasetSpi.FsVolumeReferences volumes =
GenericTestUtils.waitFor(new Supplier<Boolean>() { dn.getFSDataset().getFsVolumeReferences()) {
@Override final FsVolumeImpl volume = (FsVolumeImpl) volumes.get(0);
public Boolean get() { GenericTestUtils.waitFor(new Supplier<Boolean>() {
return (volume.getReservedForRbw() == 0); @Override
} public Boolean get() {
}, 500, Integer.MAX_VALUE); // Wait until the test times out. return (volume.getReservedForRbw() == 0);
}
}, 500, Integer.MAX_VALUE); // Wait until the test times out.
}
} }
} }

View File

@ -45,6 +45,8 @@
import org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException; import org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException;
import org.apache.hadoop.hdfs.server.datanode.ReplicaUnderRecovery; import org.apache.hadoop.hdfs.server.datanode.ReplicaUnderRecovery;
import org.apache.hadoop.hdfs.server.datanode.ReplicaWaitingToBeRecovered; import org.apache.hadoop.hdfs.server.datanode.ReplicaWaitingToBeRecovered;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException; import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
import org.junit.Assert; import org.junit.Assert;
@ -161,31 +163,37 @@ private ExtendedBlock[] setup(String bpid, FsDatasetImpl dataSet) throws IOExcep
}; };
ReplicaMap replicasMap = dataSet.volumeMap; ReplicaMap replicasMap = dataSet.volumeMap;
FsVolumeImpl vol = (FsVolumeImpl) dataSet.volumes try (FsDatasetSpi.FsVolumeReferences references =
.getNextVolume(StorageType.DEFAULT, 0).getVolume(); dataSet.getFsVolumeReferences()) {
ReplicaInfo replicaInfo = new FinalizedReplica( FsVolumeImpl vol = (FsVolumeImpl) references.get(0);
blocks[FINALIZED].getLocalBlock(), vol, vol.getCurrentDir().getParentFile()); ReplicaInfo replicaInfo = new FinalizedReplica(
replicasMap.add(bpid, replicaInfo); blocks[FINALIZED].getLocalBlock(), vol,
replicaInfo.getBlockFile().createNewFile(); vol.getCurrentDir().getParentFile());
replicaInfo.getMetaFile().createNewFile(); replicasMap.add(bpid, replicaInfo);
replicaInfo.getBlockFile().createNewFile();
replicaInfo.getMetaFile().createNewFile();
replicasMap.add(bpid, new ReplicaInPipeline( replicasMap.add(bpid, new ReplicaInPipeline(
blocks[TEMPORARY].getBlockId(), blocks[TEMPORARY].getBlockId(),
blocks[TEMPORARY].getGenerationStamp(), vol, blocks[TEMPORARY].getGenerationStamp(), vol,
vol.createTmpFile(bpid, blocks[TEMPORARY].getLocalBlock()).getParentFile(), 0)); vol.createTmpFile(bpid, blocks[TEMPORARY].getLocalBlock())
.getParentFile(), 0));
replicaInfo = new ReplicaBeingWritten(blocks[RBW].getLocalBlock(), vol, replicaInfo = new ReplicaBeingWritten(blocks[RBW].getLocalBlock(), vol,
vol.createRbwFile(bpid, blocks[RBW].getLocalBlock()).getParentFile(), null); vol.createRbwFile(bpid, blocks[RBW].getLocalBlock()).getParentFile(),
replicasMap.add(bpid, replicaInfo); null);
replicaInfo.getBlockFile().createNewFile(); replicasMap.add(bpid, replicaInfo);
replicaInfo.getMetaFile().createNewFile(); replicaInfo.getBlockFile().createNewFile();
replicaInfo.getMetaFile().createNewFile();
replicasMap.add(bpid, new ReplicaWaitingToBeRecovered(
blocks[RWR].getLocalBlock(), vol, vol.createRbwFile(bpid,
blocks[RWR].getLocalBlock()).getParentFile()));
replicasMap.add(bpid, new ReplicaUnderRecovery(new FinalizedReplica(blocks[RUR]
.getLocalBlock(), vol, vol.getCurrentDir().getParentFile()), 2007));
replicasMap.add(bpid, new ReplicaWaitingToBeRecovered(
blocks[RWR].getLocalBlock(), vol, vol.createRbwFile(bpid,
blocks[RWR].getLocalBlock()).getParentFile()));
replicasMap
.add(bpid, new ReplicaUnderRecovery(new FinalizedReplica(blocks[RUR]
.getLocalBlock(), vol, vol.getCurrentDir().getParentFile()),
2007));
}
return blocks; return blocks;
} }
@ -538,9 +546,15 @@ public void testReplicaMapAfterDatanodeRestart() throws Exception {
getFSDataset(dn); getFSDataset(dn);
ReplicaMap replicaMap = dataSet.volumeMap; ReplicaMap replicaMap = dataSet.volumeMap;
List<FsVolumeImpl> volumes = dataSet.getVolumes(); List<FsVolumeImpl> volumes = null;
// number of volumes should be 2 - [data1, data2] try (FsDatasetSpi.FsVolumeReferences referredVols = dataSet.getFsVolumeReferences()) {
assertEquals("number of volumes is wrong", 2, volumes.size()); // number of volumes should be 2 - [data1, data2]
assertEquals("number of volumes is wrong", 2, referredVols.size());
volumes = new ArrayList<>(referredVols.size());
for (FsVolumeSpi vol : referredVols) {
volumes.add((FsVolumeImpl) vol);
}
}
ArrayList<String> bpList = new ArrayList<String>(Arrays.asList( ArrayList<String> bpList = new ArrayList<String>(Arrays.asList(
cluster.getNamesystem(0).getBlockPoolId(), cluster.getNamesystem(0).getBlockPoolId(),
cluster.getNamesystem(1).getBlockPoolId())); cluster.getNamesystem(1).getBlockPoolId()));

View File

@ -57,6 +57,7 @@
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl; import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl;
import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotTestHelper; import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotTestHelper;
@ -655,14 +656,18 @@ private void waitForAllReplicas(int expectedReplicaNum, Path file,
} }
private void setVolumeFull(DataNode dn, StorageType type) { private void setVolumeFull(DataNode dn, StorageType type) {
List<? extends FsVolumeSpi> volumes = dn.getFSDataset().getVolumes(); try (FsDatasetSpi.FsVolumeReferences refs = dn.getFSDataset()
for (FsVolumeSpi v : volumes) { .getFsVolumeReferences()) {
FsVolumeImpl volume = (FsVolumeImpl) v; for (FsVolumeSpi fvs : refs) {
if (volume.getStorageType() == type) { FsVolumeImpl volume = (FsVolumeImpl) fvs;
LOG.info("setCapacity to 0 for [" + volume.getStorageType() + "]" if (volume.getStorageType() == type) {
+ volume.getStorageID()); LOG.info("setCapacity to 0 for [" + volume.getStorageType() + "]"
volume.setCapacityForTesting(0); + volume.getStorageID());
volume.setCapacityForTesting(0);
}
} }
} catch (IOException e) {
LOG.error("Unexpected exception by closing FsVolumeReference", e);
} }
} }