HDFS-6774. Make FsDataset and DataStore support removing volumes. Contributed by Lei Xu.

This commit is contained in:
Aaron T. Myers 2014-08-29 12:59:23 -07:00
parent 15366d9227
commit 7eab2a29a5
10 changed files with 245 additions and 7 deletions

View File

@ -427,6 +427,9 @@ Release 2.6.0 - UNRELEASED
HDFS-6879. Adding tracing to Hadoop RPC (Masatake Iwasaki via Colin Patrick HDFS-6879. Adding tracing to Hadoop RPC (Masatake Iwasaki via Colin Patrick
McCabe) McCabe)
HDFS-6774. Make FsDataset and DataStore support removing volumes. (Lei Xu
via atm)
OPTIMIZATIONS OPTIMIZATIONS
HDFS-6690. Deduplicate xattr names in memory. (wang) HDFS-6690. Deduplicate xattr names in memory. (wang)

View File

@ -201,6 +201,20 @@ public class BlockPoolSliceStorage extends Storage {
writeProperties(bpSdir); writeProperties(bpSdir);
} }
/**
* Remove storage directories.
* @param storageDirs a set of storage directories to be removed.
*/
void removeVolumes(Set<File> storageDirs) {
for (Iterator<StorageDirectory> it = this.storageDirs.iterator();
it.hasNext(); ) {
StorageDirectory sd = it.next();
if (storageDirs.contains(sd.getRoot())) {
it.remove();
}
}
}
/** /**
* Set layoutVersion, namespaceID and blockpoolID into block pool storage * Set layoutVersion, namespaceID and blockpoolID into block pool storage
* VERSION file * VERSION file

View File

@ -336,6 +336,33 @@ public class DataStorage extends Storage {
} }
} }
/**
* Remove volumes from DataStorage.
* @param locations a collection of volumes.
*/
synchronized void removeVolumes(Collection<StorageLocation> locations) {
if (locations.isEmpty()) {
return;
}
Set<File> dataDirs = new HashSet<File>();
for (StorageLocation sl : locations) {
dataDirs.add(sl.getFile());
}
for (BlockPoolSliceStorage bpsStorage : this.bpStorageMap.values()) {
bpsStorage.removeVolumes(dataDirs);
}
for (Iterator<StorageDirectory> it = this.storageDirs.iterator();
it.hasNext(); ) {
StorageDirectory sd = it.next();
if (dataDirs.contains(sd.getRoot())) {
it.remove();
}
}
}
/** /**
* Analyze storage directories. * Analyze storage directories.
* Recover from previous transitions if required. * Recover from previous transitions if required.

View File

@ -97,6 +97,9 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
public void addVolumes(Collection<StorageLocation> volumes) public void addVolumes(Collection<StorageLocation> volumes)
throws IOException; throws IOException;
/** Removes a collection of volumes from FsDataset. */
public void removeVolumes(Collection<StorageLocation> volumes);
/** @return a storage with the given storage ID */ /** @return a storage with the given storage ID */
public DatanodeStorage getStorage(final String storageUuid); public DatanodeStorage getStorage(final String storageUuid);

View File

@ -340,7 +340,7 @@ class BlockPoolSlice {
loadRwr = false; loadRwr = false;
} }
sc.close(); sc.close();
if (restartMeta.delete()) { if (!restartMeta.delete()) {
FsDatasetImpl.LOG.warn("Failed to delete restart meta file: " + FsDatasetImpl.LOG.warn("Failed to delete restart meta file: " +
restartMeta.getPath()); restartMeta.getPath());
} }

View File

@ -118,6 +118,24 @@ class FsDatasetAsyncDiskService {
} }
addExecutorForVolume(volume); addExecutorForVolume(volume);
} }
/**
* Stops AsyncDiskService for a volume.
* @param volume the root of the volume.
*/
synchronized void removeVolume(File volume) {
if (executors == null) {
throw new RuntimeException("AsyncDiskService is already shutdown");
}
ThreadPoolExecutor executor = executors.get(volume);
if (executor == null) {
throw new RuntimeException("Can not find volume " + volume
+ " to remove.");
} else {
executor.shutdown();
executors.remove(volume);
}
}
synchronized long countPendingDeletions() { synchronized long countPendingDeletions() {
long count = 0; long count = 0;

View File

@ -30,9 +30,11 @@ import java.nio.channels.FileChannel;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import javax.management.NotCompliantMBeanException; import javax.management.NotCompliantMBeanException;
@ -314,6 +316,51 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
} }
} }
/**
* Removes a collection of volumes from FsDataset.
* @param volumes the root directories of the volumes.
*
* DataNode should call this function before calling
* {@link DataStorage#removeVolumes(java.util.Collection)}.
*/
@Override
public synchronized void removeVolumes(Collection<StorageLocation> volumes) {
Set<File> volumeSet = new HashSet<File>();
for (StorageLocation sl : volumes) {
volumeSet.add(sl.getFile());
}
for (int idx = 0; idx < dataStorage.getNumStorageDirs(); idx++) {
Storage.StorageDirectory sd = dataStorage.getStorageDir(idx);
if (volumeSet.contains(sd.getRoot())) {
String volume = sd.getRoot().toString();
LOG.info("Removing " + volume + " from FsDataset.");
this.volumes.removeVolume(volume);
storageMap.remove(sd.getStorageUuid());
asyncDiskService.removeVolume(sd.getCurrentDir());
// Removed all replica information for the blocks on the volume. Unlike
// updating the volumeMap in addVolume(), this operation does not scan
// disks.
for (String bpid : volumeMap.getBlockPoolList()) {
List<Block> blocks = new ArrayList<Block>();
for (Iterator<ReplicaInfo> it = volumeMap.replicas(bpid).iterator();
it.hasNext(); ) {
ReplicaInfo block = it.next();
if (block.getVolume().getBasePath().equals(volume)) {
invalidate(bpid, block.getBlockId());
blocks.add(block);
it.remove();
}
}
// Delete blocks from the block scanner in batch.
datanode.getBlockScanner().deleteBlocks(bpid,
blocks.toArray(new Block[blocks.size()]));
}
}
}
}
private StorageType getStorageTypeFromLocations( private StorageType getStorageTypeFromLocations(
Collection<StorageLocation> dataLocations, File dir) { Collection<StorageLocation> dataLocations, File dir) {
for (StorageLocation dataLocation : dataLocations) { for (StorageLocation dataLocation : dataLocations) {
@ -1294,6 +1341,28 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
} }
} }
/**
* Invalidate a block but does not delete the actual on-disk block file.
*
* It should only be used for decommissioning disks.
*
* @param bpid the block pool ID.
* @param blockId the ID of the block.
*/
public void invalidate(String bpid, long blockId) {
// If a DFSClient has the replica in its cache of short-circuit file
// descriptors (and the client is using ShortCircuitShm), invalidate it.
// The short-circuit registry is null in the unit tests, because the
// datanode is mock object.
if (datanode.getShortCircuitRegistry() != null) {
datanode.getShortCircuitRegistry().processBlockInvalidation(
new ExtendedBlockId(blockId, bpid));
// If the block is cached, start uncaching it.
cacheManager.uncacheBlock(bpid, blockId);
}
}
/** /**
* Asynchronously attempts to cache a single block via {@link FsDatasetCache}. * Asynchronously attempts to cache a single block via {@link FsDatasetCache}.
*/ */

View File

@ -212,6 +212,25 @@ class FsVolumeList {
FsDatasetImpl.LOG.info("Added new volume: " + newVolume.toString()); FsDatasetImpl.LOG.info("Added new volume: " + newVolume.toString());
} }
/**
* Dynamically remove volume to the list.
* @param volume the volume to be removed.
*/
synchronized void removeVolume(String volume) {
// Make a copy of volumes to remove one volume.
final List<FsVolumeImpl> volumeList = new ArrayList<FsVolumeImpl>(volumes);
for (Iterator<FsVolumeImpl> it = volumeList.iterator(); it.hasNext(); ) {
FsVolumeImpl fsVolume = it.next();
if (fsVolume.getBasePath().equals(volume)) {
fsVolume.shutdown();
it.remove();
volumes = Collections.unmodifiableList(volumeList);
FsDatasetImpl.LOG.info("Removed volume: " + volume);
break;
}
}
}
void addBlockPool(final String bpid, final Configuration conf) throws IOException { void addBlockPool(final String bpid, final Configuration conf) throws IOException {
long totalStartTime = Time.monotonicNow(); long totalStartTime = Time.monotonicNow();

View File

@ -1120,6 +1120,11 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }
@Override
public synchronized void removeVolumes(Collection<StorageLocation> volumes) {
throw new UnsupportedOperationException();
}
@Override @Override
public void submitBackgroundSyncFileRangeRequest(ExtendedBlock block, public void submitBackgroundSyncFileRangeRequest(ExtendedBlock block,
FileDescriptor fd, long offset, long nbytes, int flags) { FileDescriptor fd, long offset, long nbytes, int flags) {

View File

@ -18,12 +18,20 @@
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl; package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystemTestHelper;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.StorageType;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.common.Storage; import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.common.StorageInfo;
import org.apache.hadoop.hdfs.server.datanode.DNConf; import org.apache.hadoop.hdfs.server.datanode.DNConf;
import org.apache.hadoop.hdfs.server.datanode.DataBlockScanner;
import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataStorage; import org.apache.hadoop.hdfs.server.datanode.DataStorage;
import org.apache.hadoop.hdfs.server.datanode.StorageLocation; import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
@ -35,25 +43,44 @@ import java.util.ArrayList;
import java.util.List; import java.util.List;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
public class TestFsDatasetImpl { public class TestFsDatasetImpl {
private static final String BASE_DIR = private static final String BASE_DIR =
System.getProperty("test.build.dir") + "/fsdatasetimpl"; new FileSystemTestHelper().getTestRootDir();
private static final int NUM_INIT_VOLUMES = 2; private static final int NUM_INIT_VOLUMES = 2;
private static final String[] BLOCK_POOL_IDS = {"bpid-0", "bpid-1"};
// Use to generate storageUuid
private static final DataStorage dsForStorageUuid = new DataStorage(
new StorageInfo(HdfsServerConstants.NodeType.DATA_NODE));
private Configuration conf;
private DataStorage storage; private DataStorage storage;
private DataBlockScanner scanner;
private FsDatasetImpl dataset; private FsDatasetImpl dataset;
private static Storage.StorageDirectory createStorageDirectory(File root) {
Storage.StorageDirectory sd = new Storage.StorageDirectory(root);
dsForStorageUuid.createStorageID(sd);
return sd;
}
private static void createStorageDirs(DataStorage storage, Configuration conf, private static void createStorageDirs(DataStorage storage, Configuration conf,
int numDirs) throws IOException { int numDirs) throws IOException {
List<Storage.StorageDirectory> dirs = List<Storage.StorageDirectory> dirs =
new ArrayList<Storage.StorageDirectory>(); new ArrayList<Storage.StorageDirectory>();
List<String> dirStrings = new ArrayList<String>(); List<String> dirStrings = new ArrayList<String>();
for (int i = 0; i < numDirs; i++) { for (int i = 0; i < numDirs; i++) {
String loc = BASE_DIR + "/data" + i; File loc = new File(BASE_DIR + "/data" + i);
dirStrings.add(loc); dirStrings.add(loc.toString());
dirs.add(new Storage.StorageDirectory(new File(loc))); loc.mkdirs();
dirs.add(createStorageDirectory(loc));
when(storage.getStorageDir(i)).thenReturn(dirs.get(i)); when(storage.getStorageDir(i)).thenReturn(dirs.get(i));
} }
@ -66,14 +93,19 @@ public class TestFsDatasetImpl {
public void setUp() throws IOException { public void setUp() throws IOException {
final DataNode datanode = Mockito.mock(DataNode.class); final DataNode datanode = Mockito.mock(DataNode.class);
storage = Mockito.mock(DataStorage.class); storage = Mockito.mock(DataStorage.class);
Configuration conf = new Configuration(); scanner = Mockito.mock(DataBlockScanner.class);
this.conf = new Configuration();
final DNConf dnConf = new DNConf(conf); final DNConf dnConf = new DNConf(conf);
when(datanode.getConf()).thenReturn(conf); when(datanode.getConf()).thenReturn(conf);
when(datanode.getDnConf()).thenReturn(dnConf); when(datanode.getDnConf()).thenReturn(dnConf);
when(datanode.getBlockScanner()).thenReturn(scanner);
createStorageDirs(storage, conf, NUM_INIT_VOLUMES); createStorageDirs(storage, conf, NUM_INIT_VOLUMES);
dataset = new FsDatasetImpl(datanode, storage, conf); dataset = new FsDatasetImpl(datanode, storage, conf);
for (String bpid : BLOCK_POOL_IDS) {
dataset.addBlockPool(bpid, conf);
}
assertEquals(NUM_INIT_VOLUMES, dataset.getVolumes().size()); assertEquals(NUM_INIT_VOLUMES, dataset.getVolumes().size());
assertEquals(0, dataset.getNumFailedVolumes()); assertEquals(0, dataset.getNumFailedVolumes());
@ -89,15 +121,63 @@ public class TestFsDatasetImpl {
String path = BASE_DIR + "/newData" + i; String path = BASE_DIR + "/newData" + i;
newLocations.add(StorageLocation.parse(path)); newLocations.add(StorageLocation.parse(path));
when(storage.getStorageDir(numExistingVolumes + i)) when(storage.getStorageDir(numExistingVolumes + i))
.thenReturn(new Storage.StorageDirectory(new File(path))); .thenReturn(createStorageDirectory(new File(path)));
} }
when(storage.getNumStorageDirs()).thenReturn(totalVolumes); when(storage.getNumStorageDirs()).thenReturn(totalVolumes);
dataset.addVolumes(newLocations); dataset.addVolumes(newLocations);
assertEquals(totalVolumes, dataset.getVolumes().size()); assertEquals(totalVolumes, dataset.getVolumes().size());
assertEquals(totalVolumes, dataset.storageMap.size());
for (int i = 0; i < numNewVolumes; i++) { for (int i = 0; i < numNewVolumes; i++) {
assertEquals(newLocations.get(i).getFile().getPath(), assertEquals(newLocations.get(i).getFile().getPath(),
dataset.getVolumes().get(numExistingVolumes + i).getBasePath()); dataset.getVolumes().get(numExistingVolumes + i).getBasePath());
} }
} }
@Test
public void testRemoveVolumes() throws IOException {
// Feed FsDataset with block metadata.
final int NUM_BLOCKS = 100;
for (int i = 0; i < NUM_BLOCKS; i++) {
String bpid = BLOCK_POOL_IDS[NUM_BLOCKS % BLOCK_POOL_IDS.length];
ExtendedBlock eb = new ExtendedBlock(bpid, i);
dataset.createRbw(StorageType.DEFAULT, eb);
}
final String[] dataDirs =
conf.get(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY).split(",");
final String volumePathToRemove = dataDirs[0];
List<StorageLocation> volumesToRemove = new ArrayList<StorageLocation>();
volumesToRemove.add(StorageLocation.parse(volumePathToRemove));
dataset.removeVolumes(volumesToRemove);
int expectedNumVolumes = dataDirs.length - 1;
assertEquals("The volume has been removed from the volumeList.",
expectedNumVolumes, dataset.getVolumes().size());
assertEquals("The volume has been removed from the storageMap.",
expectedNumVolumes, dataset.storageMap.size());
try {
dataset.asyncDiskService.execute(volumesToRemove.get(0).getFile(),
new Runnable() {
@Override
public void run() {}
});
fail("Expect RuntimeException: the volume has been removed from the "
+ "AsyncDiskService.");
} catch (RuntimeException e) {
GenericTestUtils.assertExceptionContains("Cannot find root", e);
}
int totalNumReplicas = 0;
for (String bpid : dataset.volumeMap.getBlockPoolList()) {
totalNumReplicas += dataset.volumeMap.size(bpid);
}
assertEquals("The replica infos on this volume has been removed from the "
+ "volumeMap.", NUM_BLOCKS / NUM_INIT_VOLUMES,
totalNumReplicas);
// Verify that every BlockPool deletes the removed blocks from the volume.
verify(scanner, times(BLOCK_POOL_IDS.length))
.deleteBlocks(anyString(), any(Block[].class));
}
} }