HDFS-7610. Fix removal of dynamically added DN volumes (Lei (Eddy) Xu via Colin P. McCabe)
(cherry picked from commit a17584936c
)
This commit is contained in:
parent
7b5aea8175
commit
7779f38e68
|
@ -466,6 +466,9 @@ Release 2.7.0 - UNRELEASED
|
||||||
HDFS-7641. Update archival storage user doc for list/set/get block storage
|
HDFS-7641. Update archival storage user doc for list/set/get block storage
|
||||||
policies. (yliu)
|
policies. (yliu)
|
||||||
|
|
||||||
|
HDFS-7610. Fix removal of dynamically added DN volumes (Lei (Eddy) Xu via
|
||||||
|
Colin P. McCabe)
|
||||||
|
|
||||||
Release 2.6.1 - UNRELEASED
|
Release 2.6.1 - UNRELEASED
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -336,7 +336,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
||||||
|
|
||||||
StorageType storageType = location.getStorageType();
|
StorageType storageType = location.getStorageType();
|
||||||
final FsVolumeImpl fsVolume = new FsVolumeImpl(
|
final FsVolumeImpl fsVolume = new FsVolumeImpl(
|
||||||
this, sd.getStorageUuid(), dir, this.conf, storageType);
|
this, sd.getStorageUuid(), sd.getCurrentDir(), this.conf, storageType);
|
||||||
final ReplicaMap tempVolumeMap = new ReplicaMap(fsVolume);
|
final ReplicaMap tempVolumeMap = new ReplicaMap(fsVolume);
|
||||||
ArrayList<IOException> exceptions = Lists.newArrayList();
|
ArrayList<IOException> exceptions = Lists.newArrayList();
|
||||||
|
|
||||||
|
@ -379,19 +379,19 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public synchronized void removeVolumes(Collection<StorageLocation> volumes) {
|
public synchronized void removeVolumes(Collection<StorageLocation> volumes) {
|
||||||
Set<File> volumeSet = new HashSet<File>();
|
Set<String> volumeSet = new HashSet<>();
|
||||||
for (StorageLocation sl : volumes) {
|
for (StorageLocation sl : volumes) {
|
||||||
volumeSet.add(sl.getFile());
|
volumeSet.add(sl.getFile().getAbsolutePath());
|
||||||
}
|
}
|
||||||
for (int idx = 0; idx < dataStorage.getNumStorageDirs(); idx++) {
|
for (int idx = 0; idx < dataStorage.getNumStorageDirs(); idx++) {
|
||||||
Storage.StorageDirectory sd = dataStorage.getStorageDir(idx);
|
Storage.StorageDirectory sd = dataStorage.getStorageDir(idx);
|
||||||
if (volumeSet.contains(sd.getRoot())) {
|
String volume = sd.getRoot().getAbsolutePath();
|
||||||
String volume = sd.getRoot().toString();
|
if (volumeSet.contains(volume)) {
|
||||||
LOG.info("Removing " + volume + " from FsDataset.");
|
LOG.info("Removing " + volume + " from FsDataset.");
|
||||||
|
|
||||||
// Disable the volume from the service.
|
// Disable the volume from the service.
|
||||||
asyncDiskService.removeVolume(sd.getCurrentDir());
|
asyncDiskService.removeVolume(sd.getCurrentDir());
|
||||||
this.volumes.removeVolume(volume);
|
this.volumes.removeVolume(sd.getRoot());
|
||||||
|
|
||||||
// Removed all replica information for the blocks on the volume. Unlike
|
// Removed all replica information for the blocks on the volume. Unlike
|
||||||
// updating the volumeMap in addVolume(), this operation does not scan
|
// updating the volumeMap in addVolume(), this operation does not scan
|
||||||
|
@ -401,7 +401,9 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
||||||
for (Iterator<ReplicaInfo> it = volumeMap.replicas(bpid).iterator();
|
for (Iterator<ReplicaInfo> it = volumeMap.replicas(bpid).iterator();
|
||||||
it.hasNext(); ) {
|
it.hasNext(); ) {
|
||||||
ReplicaInfo block = it.next();
|
ReplicaInfo block = it.next();
|
||||||
if (block.getVolume().getBasePath().equals(volume)) {
|
String absBasePath =
|
||||||
|
new File(block.getVolume().getBasePath()).getAbsolutePath();
|
||||||
|
if (absBasePath.equals(volume)) {
|
||||||
invalidate(bpid, block);
|
invalidate(bpid, block);
|
||||||
blocks.add(block);
|
blocks.add(block);
|
||||||
it.remove();
|
it.remove();
|
||||||
|
|
|
@ -17,6 +17,7 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
|
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
|
@ -276,13 +277,16 @@ class FsVolumeList {
|
||||||
* Dynamically remove volume in the list.
|
* Dynamically remove volume in the list.
|
||||||
* @param volume the volume to be removed.
|
* @param volume the volume to be removed.
|
||||||
*/
|
*/
|
||||||
void removeVolume(String volume) {
|
void removeVolume(File volume) {
|
||||||
// Make a copy of volumes to remove one volume.
|
// Make a copy of volumes to remove one volume.
|
||||||
final FsVolumeImpl[] curVolumes = volumes.get();
|
final FsVolumeImpl[] curVolumes = volumes.get();
|
||||||
final List<FsVolumeImpl> volumeList = Lists.newArrayList(curVolumes);
|
final List<FsVolumeImpl> volumeList = Lists.newArrayList(curVolumes);
|
||||||
for (Iterator<FsVolumeImpl> it = volumeList.iterator(); it.hasNext(); ) {
|
for (Iterator<FsVolumeImpl> it = volumeList.iterator(); it.hasNext(); ) {
|
||||||
FsVolumeImpl fsVolume = it.next();
|
FsVolumeImpl fsVolume = it.next();
|
||||||
if (fsVolume.getBasePath().equals(volume)) {
|
String basePath, targetPath;
|
||||||
|
basePath = new File(fsVolume.getBasePath()).getAbsolutePath();
|
||||||
|
targetPath = volume.getAbsolutePath();
|
||||||
|
if (basePath.equals(targetPath)) {
|
||||||
// Make sure the removed volume is the one in the curVolumes.
|
// Make sure the removed volume is the one in the curVolumes.
|
||||||
removeVolume(fsVolume);
|
removeVolume(fsVolume);
|
||||||
}
|
}
|
||||||
|
|
|
@ -45,6 +45,7 @@ import org.mockito.stubbing.Answer;
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
import java.util.Arrays;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
@ -69,6 +70,7 @@ public class TestFsDatasetImpl {
|
||||||
private static final String BASE_DIR =
|
private static final String BASE_DIR =
|
||||||
new FileSystemTestHelper().getTestRootDir();
|
new FileSystemTestHelper().getTestRootDir();
|
||||||
private static final int NUM_INIT_VOLUMES = 2;
|
private static final int NUM_INIT_VOLUMES = 2;
|
||||||
|
private static final String CLUSTER_ID = "cluser-id";
|
||||||
private static final String[] BLOCK_POOL_IDS = {"bpid-0", "bpid-1"};
|
private static final String[] BLOCK_POOL_IDS = {"bpid-0", "bpid-1"};
|
||||||
|
|
||||||
// Use to generate storageUuid
|
// Use to generate storageUuid
|
||||||
|
@ -135,10 +137,11 @@ public class TestFsDatasetImpl {
|
||||||
Set<String> expectedVolumes = new HashSet<String>();
|
Set<String> expectedVolumes = new HashSet<String>();
|
||||||
List<NamespaceInfo> nsInfos = Lists.newArrayList();
|
List<NamespaceInfo> nsInfos = Lists.newArrayList();
|
||||||
for (String bpid : BLOCK_POOL_IDS) {
|
for (String bpid : BLOCK_POOL_IDS) {
|
||||||
nsInfos.add(new NamespaceInfo(0, "cluster-id", bpid, 1));
|
nsInfos.add(new NamespaceInfo(0, CLUSTER_ID, bpid, 1));
|
||||||
}
|
}
|
||||||
for (int i = 0; i < numNewVolumes; i++) {
|
for (int i = 0; i < numNewVolumes; i++) {
|
||||||
String path = BASE_DIR + "/newData" + i;
|
String path = BASE_DIR + "/newData" + i;
|
||||||
|
expectedVolumes.add(path);
|
||||||
StorageLocation loc = StorageLocation.parse(path);
|
StorageLocation loc = StorageLocation.parse(path);
|
||||||
Storage.StorageDirectory sd = createStorageDirectory(new File(path));
|
Storage.StorageDirectory sd = createStorageDirectory(new File(path));
|
||||||
DataStorage.VolumeBuilder builder =
|
DataStorage.VolumeBuilder builder =
|
||||||
|
@ -155,7 +158,8 @@ public class TestFsDatasetImpl {
|
||||||
|
|
||||||
Set<String> actualVolumes = new HashSet<String>();
|
Set<String> actualVolumes = new HashSet<String>();
|
||||||
for (int i = 0; i < numNewVolumes; i++) {
|
for (int i = 0; i < numNewVolumes; i++) {
|
||||||
dataset.getVolumes().get(numExistingVolumes + i).getBasePath();
|
actualVolumes.add(
|
||||||
|
dataset.getVolumes().get(numExistingVolumes + i).getBasePath());
|
||||||
}
|
}
|
||||||
assertEquals(actualVolumes, expectedVolumes);
|
assertEquals(actualVolumes, expectedVolumes);
|
||||||
}
|
}
|
||||||
|
@ -207,6 +211,33 @@ public class TestFsDatasetImpl {
|
||||||
.deleteBlocks(anyString(), any(Block[].class));
|
.deleteBlocks(anyString(), any(Block[].class));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 5000)
|
||||||
|
public void testRemoveNewlyAddedVolume() throws IOException {
|
||||||
|
final int numExistingVolumes = dataset.getVolumes().size();
|
||||||
|
List<NamespaceInfo> nsInfos = new ArrayList<>();
|
||||||
|
for (String bpid : BLOCK_POOL_IDS) {
|
||||||
|
nsInfos.add(new NamespaceInfo(0, CLUSTER_ID, bpid, 1));
|
||||||
|
}
|
||||||
|
String newVolumePath = BASE_DIR + "/newVolumeToRemoveLater";
|
||||||
|
StorageLocation loc = StorageLocation.parse(newVolumePath);
|
||||||
|
|
||||||
|
Storage.StorageDirectory sd = createStorageDirectory(new File(newVolumePath));
|
||||||
|
DataStorage.VolumeBuilder builder =
|
||||||
|
new DataStorage.VolumeBuilder(storage, sd);
|
||||||
|
when(storage.prepareVolume(eq(datanode), eq(loc.getFile()),
|
||||||
|
anyListOf(NamespaceInfo.class)))
|
||||||
|
.thenReturn(builder);
|
||||||
|
|
||||||
|
dataset.addVolume(loc, nsInfos);
|
||||||
|
assertEquals(numExistingVolumes + 1, dataset.getVolumes().size());
|
||||||
|
|
||||||
|
when(storage.getNumStorageDirs()).thenReturn(numExistingVolumes + 1);
|
||||||
|
when(storage.getStorageDir(numExistingVolumes)).thenReturn(sd);
|
||||||
|
List<StorageLocation> volumesToRemove = Arrays.asList(loc);
|
||||||
|
dataset.removeVolumes(volumesToRemove);
|
||||||
|
assertEquals(numExistingVolumes, dataset.getVolumes().size());
|
||||||
|
}
|
||||||
|
|
||||||
@Test(timeout = 5000)
|
@Test(timeout = 5000)
|
||||||
public void testChangeVolumeWithRunningCheckDirs() throws IOException {
|
public void testChangeVolumeWithRunningCheckDirs() throws IOException {
|
||||||
RoundRobinVolumeChoosingPolicy<FsVolumeImpl> blockChooser =
|
RoundRobinVolumeChoosingPolicy<FsVolumeImpl> blockChooser =
|
||||||
|
@ -231,7 +262,7 @@ public class TestFsDatasetImpl {
|
||||||
@Override
|
@Override
|
||||||
public Object answer(InvocationOnMock invocationOnMock)
|
public Object answer(InvocationOnMock invocationOnMock)
|
||||||
throws Throwable {
|
throws Throwable {
|
||||||
volumeList.removeVolume("data4");
|
volumeList.removeVolume(new File("data4"));
|
||||||
volumeList.addVolume(newVolume);
|
volumeList.addVolume(newVolume);
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue