HDFS-7531. Improve the concurrent access on FsVolumeList (Lei Xu via Colin P. McCabe)

(cherry picked from commit 3b173d9517)

Conflicts:
	hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java

(cherry picked from commit dda1fc169d)

Conflicts:
	hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
	hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java
	hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java

(cherry picked from commit ba28192f9d5a8385283bd717bca494e6981d378f)
This commit is contained in:
Colin Patrick Mccabe 2014-12-17 16:41:59 -08:00 committed by Vinod Kumar Vavilapalli
parent 570d52e53c
commit c2e00e0012
4 changed files with 177 additions and 62 deletions

View File

@ -11,6 +11,9 @@ Release 2.6.1 - UNRELEASED
HDFS-7035. Make adding a new data directory to the DataNode an atomic
operation and improve error handling (Lei Xu via Colin P. McCabe)
HDFS-7531. Improve the concurrent access on FsVolumeList (Lei Xu via Colin
P. McCabe)
OPTIMIZATIONS
BUG FIXES

View File

@ -127,7 +127,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
@Override // FsDatasetSpi
public List<FsVolumeImpl> getVolumes() {
return volumes.volumes;
return volumes.getVolumes();
}
@Override
@ -140,9 +140,10 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
throws IOException {
StorageReport[] reports;
synchronized (statsLock) {
reports = new StorageReport[volumes.volumes.size()];
List<FsVolumeImpl> curVolumes = getVolumes();
reports = new StorageReport[curVolumes.size()];
int i = 0;
for (FsVolumeImpl volume : volumes.volumes) {
for (FsVolumeImpl volume : curVolumes) {
reports[i++] = new StorageReport(volume.toDatanodeStorage(),
false,
volume.getCapacity(),
@ -1322,7 +1323,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
Map<String, ArrayList<ReplicaInfo>> uc =
new HashMap<String, ArrayList<ReplicaInfo>>();
for (FsVolumeSpi v : volumes.volumes) {
List<FsVolumeImpl> curVolumes = getVolumes();
for (FsVolumeSpi v : curVolumes) {
finalized.put(v.getStorageID(), new ArrayList<ReplicaInfo>());
uc.put(v.getStorageID(), new ArrayList<ReplicaInfo>());
}
@ -1349,7 +1351,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
}
}
for (FsVolumeSpi v : volumes.volumes) {
for (FsVolumeImpl v : curVolumes) {
ArrayList<ReplicaInfo> finalizedList = finalized.get(v.getStorageID());
ArrayList<ReplicaInfo> ucList = uc.get(v.getStorageID());
blockReportsMap.put(((FsVolumeImpl) v).toDatanodeStorage(),
@ -2222,7 +2224,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
private Collection<VolumeInfo> getVolumeInfo() {
Collection<VolumeInfo> info = new ArrayList<VolumeInfo>();
for (FsVolumeImpl volume : volumes.volumes) {
for (FsVolumeImpl volume : getVolumes()) {
long used = 0;
long free = 0;
try {
@ -2256,8 +2258,9 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
@Override //FsDatasetSpi
public synchronized void deleteBlockPool(String bpid, boolean force)
throws IOException {
List<FsVolumeImpl> curVolumes = getVolumes();
if (!force) {
for (FsVolumeImpl volume : volumes.volumes) {
for (FsVolumeImpl volume : curVolumes) {
if (!volume.isBPDirEmpty(bpid)) {
LOG.warn(bpid + " has some block files, cannot delete unless forced");
throw new IOException("Cannot delete block pool, "
@ -2265,7 +2268,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
}
}
}
for (FsVolumeImpl volume : volumes.volumes) {
for (FsVolumeImpl volume : curVolumes) {
volume.deleteBPDirectories(bpid, force);
}
}
@ -2283,13 +2286,14 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
@Override // FsDatasetSpi
public HdfsBlocksMetadata getHdfsBlocksMetadata(String poolId,
long[] blockIds) throws IOException {
List<FsVolumeImpl> curVolumes = getVolumes();
// List of VolumeIds, one per volume on the datanode
List<byte[]> blocksVolumeIds = new ArrayList<byte[]>(volumes.volumes.size());
List<byte[]> blocksVolumeIds = new ArrayList<byte[]>(curVolumes.size());
// List of indexes into the list of VolumeIds, pointing at the VolumeId of
// the volume that the block is on
List<Integer> blocksVolumeIndexes = new ArrayList<Integer>(blockIds.length);
// Initialize the list of VolumeIds simply by enumerating the volumes
for (int i = 0; i < volumes.volumes.size(); i++) {
for (int i = 0; i < curVolumes.size(); i++) {
blocksVolumeIds.add(ByteBuffer.allocate(4).putInt(i).array());
}
// Determine the index of the VolumeId of each block's volume, by comparing
@ -2302,7 +2306,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
int volumeIndex = 0;
if (info != null) {
FsVolumeSpi blockVolume = info.getVolume();
for (FsVolumeImpl volume : volumes.volumes) {
for (FsVolumeImpl volume : curVolumes) {
// This comparison of references should be safe
if (blockVolume == volume) {
isValid = true;
@ -2526,7 +2530,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
// Don't worry about fragmentation for now. We don't expect more than one
// transient volume per DN.
for (FsVolumeImpl v : volumes.volumes) {
for (FsVolumeImpl v : getVolumes()) {
if (v.isTransientStorage()) {
capacity += v.getCapacity();
free += v.getAvailable();

View File

@ -19,10 +19,13 @@ package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
import com.google.common.collect.Lists;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.StorageType;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
@ -31,11 +34,8 @@ import org.apache.hadoop.util.DiskChecker.DiskErrorException;
import org.apache.hadoop.util.Time;
class FsVolumeList {
/**
* Read access to this unmodifiable list is not synchronized.
* This list is replaced on modification holding "this" lock.
*/
volatile List<FsVolumeImpl> volumes = null;
private final AtomicReference<FsVolumeImpl[]> volumes =
new AtomicReference<FsVolumeImpl[]>(new FsVolumeImpl[0]);
private Object checkDirsMutex = new Object();
private final VolumeChoosingPolicy<FsVolumeImpl> blockChooser;
@ -50,19 +50,28 @@ class FsVolumeList {
int numberOfFailedVolumes() {
return numFailedVolumes;
}
/**
* Return an immutable list view of all the volumes.
*/
List<FsVolumeImpl> getVolumes() {
return Collections.unmodifiableList(Arrays.asList(volumes.get()));
}
/**
* Get next volume. Synchronized to ensure {@link #curVolume} is updated
* by a single thread and next volume is chosen with no concurrent
* update to {@link #volumes}.
* Get next volume.
*
* @param blockSize free space needed on the volume
* @param storageType the desired {@link StorageType}
* @return next volume to store the block in.
*/
synchronized FsVolumeImpl getNextVolume(StorageType storageType,
long blockSize) throws IOException {
final List<FsVolumeImpl> list = new ArrayList<FsVolumeImpl>(volumes.size());
for(FsVolumeImpl v : volumes) {
FsVolumeImpl getNextVolume(StorageType storageType, long blockSize)
throws IOException {
// Get a snapshot of currently available volumes.
final FsVolumeImpl[] curVolumes = volumes.get();
final List<FsVolumeImpl> list =
new ArrayList<FsVolumeImpl>(curVolumes.length);
for(FsVolumeImpl v : curVolumes) {
if (v.getStorageType() == storageType) {
list.add(v);
}
@ -71,16 +80,17 @@ class FsVolumeList {
}
/**
* Get next volume. Synchronized to ensure {@link #curVolume} is updated
* by a single thread and next volume is chosen with no concurrent
* update to {@link #volumes}.
* Get next volume.
*
* @param blockSize free space needed on the volume
* @return next volume to store the block in.
*/
synchronized FsVolumeImpl getNextTransientVolume(
long blockSize) throws IOException {
final List<FsVolumeImpl> list = new ArrayList<FsVolumeImpl>(volumes.size());
for(FsVolumeImpl v : volumes) {
FsVolumeImpl getNextTransientVolume(long blockSize) throws IOException {
// Get a snapshot of currently available volumes.
final List<FsVolumeImpl> curVolumes = getVolumes();
final List<FsVolumeImpl> list =
new ArrayList<FsVolumeImpl>(curVolumes.size());
for(FsVolumeImpl v : curVolumes) {
if (v.isTransientStorage()) {
list.add(v);
}
@ -90,7 +100,7 @@ class FsVolumeList {
long getDfsUsed() throws IOException {
long dfsUsed = 0L;
for (FsVolumeImpl v : volumes) {
for (FsVolumeImpl v : volumes.get()) {
dfsUsed += v.getDfsUsed();
}
return dfsUsed;
@ -98,7 +108,7 @@ class FsVolumeList {
long getBlockPoolUsed(String bpid) throws IOException {
long dfsUsed = 0L;
for (FsVolumeImpl v : volumes) {
for (FsVolumeImpl v : volumes.get()) {
dfsUsed += v.getBlockPoolUsed(bpid);
}
return dfsUsed;
@ -106,7 +116,7 @@ class FsVolumeList {
long getCapacity() {
long capacity = 0L;
for (FsVolumeImpl v : volumes) {
for (FsVolumeImpl v : volumes.get()) {
capacity += v.getCapacity();
}
return capacity;
@ -114,7 +124,7 @@ class FsVolumeList {
long getRemaining() throws IOException {
long remaining = 0L;
for (FsVolumeSpi vol : volumes) {
for (FsVolumeSpi vol : volumes.get()) {
remaining += vol.getAvailable();
}
return remaining;
@ -128,7 +138,7 @@ class FsVolumeList {
final List<IOException> exceptions = Collections.synchronizedList(
new ArrayList<IOException>());
List<Thread> replicaAddingThreads = new ArrayList<Thread>();
for (final FsVolumeImpl v : volumes) {
for (final FsVolumeImpl v : volumes.get()) {
Thread t = new Thread() {
public void run() {
try {
@ -177,7 +187,7 @@ class FsVolumeList {
ArrayList<FsVolumeImpl> removedVols = null;
// Make a copy of volumes for performing modification
final List<FsVolumeImpl> volumeList = new ArrayList<FsVolumeImpl>(volumes);
final List<FsVolumeImpl> volumeList = getVolumes();
for(Iterator<FsVolumeImpl> i = volumeList.iterator(); i.hasNext(); ) {
final FsVolumeImpl fsv = i.next();
@ -189,7 +199,7 @@ class FsVolumeList {
removedVols = new ArrayList<FsVolumeImpl>(1);
}
removedVols.add(fsv);
removeVolume(fsv.getBasePath());
removeVolume(fsv);
numFailedVolumes++;
}
}
@ -212,31 +222,71 @@ class FsVolumeList {
* Dynamically add new volumes to the existing volumes that this DN manages.
* @param newVolume the instance of new FsVolumeImpl.
*/
synchronized void addVolume(FsVolumeImpl newVolume) {
void addVolume(FsVolumeImpl newVolume) {
// Make a copy of volumes to add new volumes.
final List<FsVolumeImpl> volumeList = volumes == null ?
new ArrayList<FsVolumeImpl>() :
new ArrayList<FsVolumeImpl>(volumes);
volumeList.add(newVolume);
volumes = Collections.unmodifiableList(volumeList);
while (true) {
final FsVolumeImpl[] curVolumes = volumes.get();
final List<FsVolumeImpl> volumeList = Lists.newArrayList(curVolumes);
volumeList.add(newVolume);
if (volumes.compareAndSet(curVolumes,
volumeList.toArray(new FsVolumeImpl[volumeList.size()]))) {
break;
} else {
if (FsDatasetImpl.LOG.isDebugEnabled()) {
FsDatasetImpl.LOG.debug(
"The volume list has been changed concurrently, " +
"retry to remove volume: " + newVolume);
}
}
}
FsDatasetImpl.LOG.info("Added new volume: " + newVolume.toString());
}
/**
* Dynamically remove volume to the list.
* Dynamically remove a volume in the list.
* @param target the volume instance to be removed.
*/
private void removeVolume(FsVolumeImpl target) {
while (true) {
final FsVolumeImpl[] curVolumes = volumes.get();
final List<FsVolumeImpl> volumeList = Lists.newArrayList(curVolumes);
if (volumeList.remove(target)) {
if (volumes.compareAndSet(curVolumes,
volumeList.toArray(new FsVolumeImpl[volumeList.size()]))) {
target.shutdown();
FsDatasetImpl.LOG.info("Removed volume: " + target);
break;
} else {
if (FsDatasetImpl.LOG.isDebugEnabled()) {
FsDatasetImpl.LOG.debug(
"The volume list has been changed concurrently, " +
"retry to remove volume: " + target);
}
}
} else {
if (FsDatasetImpl.LOG.isDebugEnabled()) {
FsDatasetImpl.LOG.debug("Volume " + target +
" does not exist or is removed by others.");
}
break;
}
}
}
/**
* Dynamically remove volume in the list.
* @param volume the volume to be removed.
*/
synchronized void removeVolume(String volume) {
void removeVolume(String volume) {
// Make a copy of volumes to remove one volume.
final List<FsVolumeImpl> volumeList = new ArrayList<FsVolumeImpl>(volumes);
final FsVolumeImpl[] curVolumes = volumes.get();
final List<FsVolumeImpl> volumeList = Lists.newArrayList(curVolumes);
for (Iterator<FsVolumeImpl> it = volumeList.iterator(); it.hasNext(); ) {
FsVolumeImpl fsVolume = it.next();
if (fsVolume.getBasePath().equals(volume)) {
fsVolume.shutdown();
it.remove();
volumes = Collections.unmodifiableList(volumeList);
FsDatasetImpl.LOG.info("Removed volume: " + volume);
break;
// Make sure the removed volume is the one in the curVolumes.
removeVolume(fsVolume);
}
}
}
@ -247,7 +297,7 @@ class FsVolumeList {
final List<IOException> exceptions = Collections.synchronizedList(
new ArrayList<IOException>());
List<Thread> blockPoolAddingThreads = new ArrayList<Thread>();
for (final FsVolumeImpl v : volumes) {
for (final FsVolumeImpl v : volumes.get()) {
Thread t = new Thread() {
public void run() {
try {
@ -285,13 +335,13 @@ class FsVolumeList {
}
void removeBlockPool(String bpid) {
for (FsVolumeImpl v : volumes) {
for (FsVolumeImpl v : volumes.get()) {
v.shutdownBlockPool(bpid);
}
}
void shutdown() {
for (FsVolumeImpl volume : volumes) {
for (FsVolumeImpl volume : volumes.get()) {
if(volume != null) {
volume.shutdown();
}

View File

@ -34,12 +34,15 @@ import org.apache.hadoop.hdfs.server.datanode.DataStorage;
import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica;
import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.RoundRobinVolumeChoosingPolicy;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.DiskChecker;
import org.apache.hadoop.util.StringUtils;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import java.io.File;
import java.io.IOException;
@ -49,13 +52,19 @@ import java.util.List;
import java.util.Set;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyListOf;
import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@ -102,9 +111,9 @@ public class TestFsDatasetImpl {
@Before
public void setUp() throws IOException {
datanode = Mockito.mock(DataNode.class);
storage = Mockito.mock(DataStorage.class);
scanner = Mockito.mock(DataBlockScanner.class);
datanode = mock(DataNode.class);
storage = mock(DataStorage.class);
scanner = mock(DataBlockScanner.class);
this.conf = new Configuration();
final DNConf dnConf = new DNConf(conf);
@ -204,8 +213,8 @@ public class TestFsDatasetImpl {
@Test
public void testDuplicateReplicaResolution() throws IOException {
FsVolumeImpl fsv1 = Mockito.mock(FsVolumeImpl.class);
FsVolumeImpl fsv2 = Mockito.mock(FsVolumeImpl.class);
FsVolumeImpl fsv1 = mock(FsVolumeImpl.class);
FsVolumeImpl fsv2 = mock(FsVolumeImpl.class);
File f1 = new File("d1/block");
File f2 = new File("d2/block");
@ -232,4 +241,53 @@ public class TestFsDatasetImpl {
assertSame(replica,
BlockPoolSlice.selectReplicaToDelete(replicaOtherNewer, replica));
}
@Test(timeout = 5000)
public void testChangeVolumeWithRunningCheckDirs() throws IOException {
RoundRobinVolumeChoosingPolicy<FsVolumeImpl> blockChooser =
new RoundRobinVolumeChoosingPolicy<FsVolumeImpl>();
final FsVolumeList volumeList = new FsVolumeList(0, blockChooser);
final List<FsVolumeImpl> oldVolumes = new ArrayList<FsVolumeImpl>();
// Initialize FsVolumeList with 5 mock volumes.
final int NUM_VOLUMES = 5;
for (int i = 0; i < NUM_VOLUMES; i++) {
FsVolumeImpl volume = mock(FsVolumeImpl.class);
oldVolumes.add(volume);
when(volume.getBasePath()).thenReturn("data" + i);
volumeList.addVolume(volume);
}
// When call checkDirs() on the 2nd volume, anther "thread" removes the 5th
// volume and add another volume. It does not affect checkDirs() running.
final FsVolumeImpl newVolume = mock(FsVolumeImpl.class);
FsVolumeImpl blockedVolume = volumeList.getVolumes().get(1);
doAnswer(new Answer() {
@Override
public Object answer(InvocationOnMock invocationOnMock)
throws Throwable {
volumeList.removeVolume("data4");
volumeList.addVolume(newVolume);
return null;
}
}).when(blockedVolume).checkDirs();
FsVolumeImpl brokenVolume = volumeList.getVolumes().get(2);
doThrow(new DiskChecker.DiskErrorException("broken"))
.when(brokenVolume).checkDirs();
volumeList.checkDirs();
// Since FsVolumeImpl#checkDirs() get a snapshot of the list of volumes
// before running removeVolume(), it is supposed to run checkDirs() on all
// the old volumes.
for (FsVolumeImpl volume : oldVolumes) {
verify(volume).checkDirs();
}
// New volume is not visible to checkDirs() process.
verify(newVolume, never()).checkDirs();
assertTrue(volumeList.getVolumes().contains(newVolume));
assertFalse(volumeList.getVolumes().contains(brokenVolume));
assertEquals(NUM_VOLUMES - 1, volumeList.getVolumes().size());
}
}