diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index f71856499ce..e9244a4128f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -11,6 +11,9 @@ Release 2.6.1 - UNRELEASED HDFS-7035. Make adding a new data directory to the DataNode an atomic operation and improve error handling (Lei Xu via Colin P. McCabe) + HDFS-7531. Improve the concurrent access on FsVolumeList (Lei Xu via Colin + P. McCabe) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java index e7fa6d7a036..0d9f096077e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java @@ -127,7 +127,7 @@ class FsDatasetImpl implements FsDatasetSpi { @Override // FsDatasetSpi public List getVolumes() { - return volumes.volumes; + return volumes.getVolumes(); } @Override @@ -140,9 +140,10 @@ class FsDatasetImpl implements FsDatasetSpi { throws IOException { StorageReport[] reports; synchronized (statsLock) { - reports = new StorageReport[volumes.volumes.size()]; + List curVolumes = getVolumes(); + reports = new StorageReport[curVolumes.size()]; int i = 0; - for (FsVolumeImpl volume : volumes.volumes) { + for (FsVolumeImpl volume : curVolumes) { reports[i++] = new StorageReport(volume.toDatanodeStorage(), false, volume.getCapacity(), @@ -1322,7 +1323,8 @@ class FsDatasetImpl implements FsDatasetSpi { Map> uc = new HashMap>(); - for (FsVolumeSpi v : volumes.volumes) { + List curVolumes = getVolumes(); + for (FsVolumeSpi v : curVolumes) { finalized.put(v.getStorageID(), new ArrayList()); uc.put(v.getStorageID(), new ArrayList()); } @@ -1349,7 +1351,7 @@ class FsDatasetImpl implements FsDatasetSpi { } } - for (FsVolumeSpi v : volumes.volumes) { + for (FsVolumeImpl v : curVolumes) { ArrayList finalizedList = finalized.get(v.getStorageID()); ArrayList ucList = uc.get(v.getStorageID()); blockReportsMap.put(((FsVolumeImpl) v).toDatanodeStorage(), @@ -2222,7 +2224,7 @@ class FsDatasetImpl implements FsDatasetSpi { private Collection getVolumeInfo() { Collection info = new ArrayList(); - for (FsVolumeImpl volume : volumes.volumes) { + for (FsVolumeImpl volume : getVolumes()) { long used = 0; long free = 0; try { @@ -2256,8 +2258,9 @@ class FsDatasetImpl implements FsDatasetSpi { @Override //FsDatasetSpi public synchronized void deleteBlockPool(String bpid, boolean force) throws IOException { + List curVolumes = getVolumes(); if (!force) { - for (FsVolumeImpl volume : volumes.volumes) { + for (FsVolumeImpl volume : curVolumes) { if (!volume.isBPDirEmpty(bpid)) { LOG.warn(bpid + " has some block files, cannot delete unless forced"); throw new IOException("Cannot delete block pool, " @@ -2265,7 +2268,7 @@ class FsDatasetImpl implements FsDatasetSpi { } } } - for (FsVolumeImpl volume : volumes.volumes) { + for (FsVolumeImpl volume : curVolumes) { volume.deleteBPDirectories(bpid, force); } } @@ -2283,13 +2286,14 @@ class FsDatasetImpl implements FsDatasetSpi { @Override // FsDatasetSpi public HdfsBlocksMetadata getHdfsBlocksMetadata(String poolId, long[] blockIds) throws IOException { + List curVolumes = getVolumes(); // List of VolumeIds, one per volume on the datanode - List blocksVolumeIds = new ArrayList(volumes.volumes.size()); + List blocksVolumeIds = new ArrayList(curVolumes.size()); // List of indexes into the list of VolumeIds, pointing at the VolumeId of // the volume that the block is on List blocksVolumeIndexes = new ArrayList(blockIds.length); // Initialize the list of VolumeIds simply by enumerating the volumes - for (int i = 0; i < volumes.volumes.size(); i++) { + for (int i = 0; i < curVolumes.size(); i++) { blocksVolumeIds.add(ByteBuffer.allocate(4).putInt(i).array()); } // Determine the index of the VolumeId of each block's volume, by comparing @@ -2302,7 +2306,7 @@ class FsDatasetImpl implements FsDatasetSpi { int volumeIndex = 0; if (info != null) { FsVolumeSpi blockVolume = info.getVolume(); - for (FsVolumeImpl volume : volumes.volumes) { + for (FsVolumeImpl volume : curVolumes) { // This comparison of references should be safe if (blockVolume == volume) { isValid = true; @@ -2526,7 +2530,7 @@ class FsDatasetImpl implements FsDatasetSpi { // Don't worry about fragmentation for now. We don't expect more than one // transient volume per DN. - for (FsVolumeImpl v : volumes.volumes) { + for (FsVolumeImpl v : getVolumes()) { if (v.isTransientStorage()) { capacity += v.getCapacity(); free += v.getAvailable(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java index 55329aea7d9..94834449d83 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java @@ -19,10 +19,13 @@ package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl; import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.Iterator; import java.util.List; +import java.util.concurrent.atomic.AtomicReference; +import com.google.common.collect.Lists; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.StorageType; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; @@ -31,11 +34,8 @@ import org.apache.hadoop.util.DiskChecker.DiskErrorException; import org.apache.hadoop.util.Time; class FsVolumeList { - /** - * Read access to this unmodifiable list is not synchronized. - * This list is replaced on modification holding "this" lock. - */ - volatile List volumes = null; + private final AtomicReference volumes = + new AtomicReference(new FsVolumeImpl[0]); private Object checkDirsMutex = new Object(); private final VolumeChoosingPolicy blockChooser; @@ -50,19 +50,28 @@ class FsVolumeList { int numberOfFailedVolumes() { return numFailedVolumes; } - + + /** + * Return an immutable list view of all the volumes. + */ + List getVolumes() { + return Collections.unmodifiableList(Arrays.asList(volumes.get())); + } + /** - * Get next volume. Synchronized to ensure {@link #curVolume} is updated - * by a single thread and next volume is chosen with no concurrent - * update to {@link #volumes}. + * Get next volume. + * * @param blockSize free space needed on the volume * @param storageType the desired {@link StorageType} * @return next volume to store the block in. */ - synchronized FsVolumeImpl getNextVolume(StorageType storageType, - long blockSize) throws IOException { - final List list = new ArrayList(volumes.size()); - for(FsVolumeImpl v : volumes) { + FsVolumeImpl getNextVolume(StorageType storageType, long blockSize) + throws IOException { + // Get a snapshot of currently available volumes. + final FsVolumeImpl[] curVolumes = volumes.get(); + final List list = + new ArrayList(curVolumes.length); + for(FsVolumeImpl v : curVolumes) { if (v.getStorageType() == storageType) { list.add(v); } @@ -71,16 +80,17 @@ class FsVolumeList { } /** - * Get next volume. Synchronized to ensure {@link #curVolume} is updated - * by a single thread and next volume is chosen with no concurrent - * update to {@link #volumes}. + * Get next volume. + * * @param blockSize free space needed on the volume * @return next volume to store the block in. */ - synchronized FsVolumeImpl getNextTransientVolume( - long blockSize) throws IOException { - final List list = new ArrayList(volumes.size()); - for(FsVolumeImpl v : volumes) { + FsVolumeImpl getNextTransientVolume(long blockSize) throws IOException { + // Get a snapshot of currently available volumes. + final List curVolumes = getVolumes(); + final List list = + new ArrayList(curVolumes.size()); + for(FsVolumeImpl v : curVolumes) { if (v.isTransientStorage()) { list.add(v); } @@ -90,7 +100,7 @@ class FsVolumeList { long getDfsUsed() throws IOException { long dfsUsed = 0L; - for (FsVolumeImpl v : volumes) { + for (FsVolumeImpl v : volumes.get()) { dfsUsed += v.getDfsUsed(); } return dfsUsed; @@ -98,7 +108,7 @@ class FsVolumeList { long getBlockPoolUsed(String bpid) throws IOException { long dfsUsed = 0L; - for (FsVolumeImpl v : volumes) { + for (FsVolumeImpl v : volumes.get()) { dfsUsed += v.getBlockPoolUsed(bpid); } return dfsUsed; @@ -106,7 +116,7 @@ class FsVolumeList { long getCapacity() { long capacity = 0L; - for (FsVolumeImpl v : volumes) { + for (FsVolumeImpl v : volumes.get()) { capacity += v.getCapacity(); } return capacity; @@ -114,7 +124,7 @@ class FsVolumeList { long getRemaining() throws IOException { long remaining = 0L; - for (FsVolumeSpi vol : volumes) { + for (FsVolumeSpi vol : volumes.get()) { remaining += vol.getAvailable(); } return remaining; @@ -128,7 +138,7 @@ class FsVolumeList { final List exceptions = Collections.synchronizedList( new ArrayList()); List replicaAddingThreads = new ArrayList(); - for (final FsVolumeImpl v : volumes) { + for (final FsVolumeImpl v : volumes.get()) { Thread t = new Thread() { public void run() { try { @@ -177,7 +187,7 @@ class FsVolumeList { ArrayList removedVols = null; // Make a copy of volumes for performing modification - final List volumeList = new ArrayList(volumes); + final List volumeList = getVolumes(); for(Iterator i = volumeList.iterator(); i.hasNext(); ) { final FsVolumeImpl fsv = i.next(); @@ -189,7 +199,7 @@ class FsVolumeList { removedVols = new ArrayList(1); } removedVols.add(fsv); - removeVolume(fsv.getBasePath()); + removeVolume(fsv); numFailedVolumes++; } } @@ -212,31 +222,71 @@ class FsVolumeList { * Dynamically add new volumes to the existing volumes that this DN manages. * @param newVolume the instance of new FsVolumeImpl. */ - synchronized void addVolume(FsVolumeImpl newVolume) { + void addVolume(FsVolumeImpl newVolume) { // Make a copy of volumes to add new volumes. - final List volumeList = volumes == null ? - new ArrayList() : - new ArrayList(volumes); - volumeList.add(newVolume); - volumes = Collections.unmodifiableList(volumeList); + while (true) { + final FsVolumeImpl[] curVolumes = volumes.get(); + final List volumeList = Lists.newArrayList(curVolumes); + volumeList.add(newVolume); + if (volumes.compareAndSet(curVolumes, + volumeList.toArray(new FsVolumeImpl[volumeList.size()]))) { + break; + } else { + if (FsDatasetImpl.LOG.isDebugEnabled()) { + FsDatasetImpl.LOG.debug( + "The volume list has been changed concurrently, " + + "retry to remove volume: " + newVolume); + } + } + } + FsDatasetImpl.LOG.info("Added new volume: " + newVolume.toString()); } /** - * Dynamically remove volume to the list. + * Dynamically remove a volume in the list. + * @param target the volume instance to be removed. + */ + private void removeVolume(FsVolumeImpl target) { + while (true) { + final FsVolumeImpl[] curVolumes = volumes.get(); + final List volumeList = Lists.newArrayList(curVolumes); + if (volumeList.remove(target)) { + if (volumes.compareAndSet(curVolumes, + volumeList.toArray(new FsVolumeImpl[volumeList.size()]))) { + target.shutdown(); + FsDatasetImpl.LOG.info("Removed volume: " + target); + break; + } else { + if (FsDatasetImpl.LOG.isDebugEnabled()) { + FsDatasetImpl.LOG.debug( + "The volume list has been changed concurrently, " + + "retry to remove volume: " + target); + } + } + } else { + if (FsDatasetImpl.LOG.isDebugEnabled()) { + FsDatasetImpl.LOG.debug("Volume " + target + + " does not exist or is removed by others."); + } + break; + } + } + } + + /** + * Dynamically remove volume in the list. * @param volume the volume to be removed. */ - synchronized void removeVolume(String volume) { + void removeVolume(String volume) { // Make a copy of volumes to remove one volume. - final List volumeList = new ArrayList(volumes); + final FsVolumeImpl[] curVolumes = volumes.get(); + final List volumeList = Lists.newArrayList(curVolumes); for (Iterator it = volumeList.iterator(); it.hasNext(); ) { FsVolumeImpl fsVolume = it.next(); if (fsVolume.getBasePath().equals(volume)) { - fsVolume.shutdown(); - it.remove(); - volumes = Collections.unmodifiableList(volumeList); - FsDatasetImpl.LOG.info("Removed volume: " + volume); - break; + // Make sure the removed volume is the one in the curVolumes. + removeVolume(fsVolume); } } } @@ -247,7 +297,7 @@ class FsVolumeList { final List exceptions = Collections.synchronizedList( new ArrayList()); List blockPoolAddingThreads = new ArrayList(); - for (final FsVolumeImpl v : volumes) { + for (final FsVolumeImpl v : volumes.get()) { Thread t = new Thread() { public void run() { try { @@ -285,13 +335,13 @@ class FsVolumeList { } void removeBlockPool(String bpid) { - for (FsVolumeImpl v : volumes) { + for (FsVolumeImpl v : volumes.get()) { v.shutdownBlockPool(bpid); } } void shutdown() { - for (FsVolumeImpl volume : volumes) { + for (FsVolumeImpl volume : volumes.get()) { if(volume != null) { volume.shutdown(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java index 28951c3a1da..60fc238ace5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java @@ -34,12 +34,15 @@ import org.apache.hadoop.hdfs.server.datanode.DataStorage; import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica; import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo; import org.apache.hadoop.hdfs.server.datanode.StorageLocation; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.RoundRobinVolumeChoosingPolicy; import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.util.DiskChecker; import org.apache.hadoop.util.StringUtils; import org.junit.Before; import org.junit.Test; -import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; import java.io.File; import java.io.IOException; @@ -49,13 +52,19 @@ import java.util.List; import java.util.Set; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyListOf; import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -102,9 +111,9 @@ public class TestFsDatasetImpl { @Before public void setUp() throws IOException { - datanode = Mockito.mock(DataNode.class); - storage = Mockito.mock(DataStorage.class); - scanner = Mockito.mock(DataBlockScanner.class); + datanode = mock(DataNode.class); + storage = mock(DataStorage.class); + scanner = mock(DataBlockScanner.class); this.conf = new Configuration(); final DNConf dnConf = new DNConf(conf); @@ -204,8 +213,8 @@ public class TestFsDatasetImpl { @Test public void testDuplicateReplicaResolution() throws IOException { - FsVolumeImpl fsv1 = Mockito.mock(FsVolumeImpl.class); - FsVolumeImpl fsv2 = Mockito.mock(FsVolumeImpl.class); + FsVolumeImpl fsv1 = mock(FsVolumeImpl.class); + FsVolumeImpl fsv2 = mock(FsVolumeImpl.class); File f1 = new File("d1/block"); File f2 = new File("d2/block"); @@ -232,4 +241,53 @@ public class TestFsDatasetImpl { assertSame(replica, BlockPoolSlice.selectReplicaToDelete(replicaOtherNewer, replica)); } + + @Test(timeout = 5000) + public void testChangeVolumeWithRunningCheckDirs() throws IOException { + RoundRobinVolumeChoosingPolicy blockChooser = + new RoundRobinVolumeChoosingPolicy(); + final FsVolumeList volumeList = new FsVolumeList(0, blockChooser); + final List oldVolumes = new ArrayList(); + + // Initialize FsVolumeList with 5 mock volumes. + final int NUM_VOLUMES = 5; + for (int i = 0; i < NUM_VOLUMES; i++) { + FsVolumeImpl volume = mock(FsVolumeImpl.class); + oldVolumes.add(volume); + when(volume.getBasePath()).thenReturn("data" + i); + volumeList.addVolume(volume); + } + + // When call checkDirs() on the 2nd volume, anther "thread" removes the 5th + // volume and add another volume. It does not affect checkDirs() running. + final FsVolumeImpl newVolume = mock(FsVolumeImpl.class); + FsVolumeImpl blockedVolume = volumeList.getVolumes().get(1); + doAnswer(new Answer() { + @Override + public Object answer(InvocationOnMock invocationOnMock) + throws Throwable { + volumeList.removeVolume("data4"); + volumeList.addVolume(newVolume); + return null; + } + }).when(blockedVolume).checkDirs(); + + FsVolumeImpl brokenVolume = volumeList.getVolumes().get(2); + doThrow(new DiskChecker.DiskErrorException("broken")) + .when(brokenVolume).checkDirs(); + + volumeList.checkDirs(); + + // Since FsVolumeImpl#checkDirs() get a snapshot of the list of volumes + // before running removeVolume(), it is supposed to run checkDirs() on all + // the old volumes. + for (FsVolumeImpl volume : oldVolumes) { + verify(volume).checkDirs(); + } + // New volume is not visible to checkDirs() process. + verify(newVolume, never()).checkDirs(); + assertTrue(volumeList.getVolumes().contains(newVolume)); + assertFalse(volumeList.getVolumes().contains(brokenVolume)); + assertEquals(NUM_VOLUMES - 1, volumeList.getVolumes().size()); + } }