diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index ee9dcc2b4f9..1141df14e8f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -2667,6 +2667,8 @@ Release 2.8.0 - UNRELEASED HDFS-9210. Fix some misuse of %n in VolumeScanner#printStats. (Xiaoyu Yao) + HDFS-9701. DN may deadlock when hot-swapping under load. (Xiao Chen via lei) + Release 2.7.3 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java index cbd412305b3..bf873463c37 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java @@ -493,6 +493,7 @@ public void removeVolumes(Set volumesToRemove, boolean clearFailure) { // Disable the volume from the service. asyncDiskService.removeVolume(sd.getCurrentDir()); volumes.removeVolume(absRoot, clearFailure); + volumes.waitVolumeRemoved(5000, this); // Removed all replica information for the blocks on the volume. // Unlike updating the volumeMap in addVolume(), this operation does @@ -1769,6 +1770,7 @@ public synchronized List getFinalizedBlocksOnPersistentStorage * * @throws IOException May be thrown from the methods called. */ + @Override // FsDatasetSpi public void checkBlock(ExtendedBlock b, long minLength, ReplicaState state) throws ReplicaNotFoundException, UnexpectedReplicaStateException, FileNotFoundException, EOFException, IOException { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java index 6b790737514..86f03c2d435 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java @@ -235,29 +235,30 @@ private void checkReference() { } /** - * Close this volume and wait all other threads to release the reference count - * on this volume. - * @throws IOException if the volume is closed or the waiting is interrupted. + * Close this volume. + * @throws IOException if the volume is closed. */ - void closeAndWait() throws IOException { + void setClosed() throws IOException { try { this.reference.setClosed(); } catch (ClosedChannelException e) { throw new IOException("The volume has already closed.", e); } - final int SLEEP_MILLIS = 500; - while (this.reference.getReferenceCount() > 0) { + } + + /** + * Check whether this volume has successfully been closed. + */ + boolean checkClosed() { + if (this.reference.getReferenceCount() > 0) { if (FsDatasetImpl.LOG.isDebugEnabled()) { FsDatasetImpl.LOG.debug(String.format( "The reference count for %s is %d, wait to be 0.", this, reference.getReferenceCount())); } - try { - Thread.sleep(SLEEP_MILLIS); - } catch (InterruptedException e) { - throw new IOException(e); - } + return false; } + return true; } File getCurrentDir() { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java index 608ee291b41..ea4d5975cd0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java @@ -23,6 +23,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.HashSet; import java.util.Iterator; import java.util.List; @@ -49,6 +50,8 @@ class FsVolumeList { // Tracks volume failures, sorted by volume path. private final Map volumeFailureInfos = Collections.synchronizedMap(new TreeMap()); + private final ConcurrentLinkedQueue volumesBeingRemoved = + new ConcurrentLinkedQueue<>(); private Object checkDirsMutex = new Object(); private final VolumeChoosingPolicy blockChooser; @@ -257,10 +260,33 @@ Set checkDirs() { + " failure volumes."); } + waitVolumeRemoved(5000, checkDirsMutex); return failedVols; } } + /** + * Wait for the reference of the volume removed from a previous + * {@link #removeVolume(FsVolumeImpl)} call to be released. + * + * @param sleepMillis interval to recheck. + */ + void waitVolumeRemoved(int sleepMillis, Object monitor) { + while (!checkVolumesRemoved()) { + if (FsDatasetImpl.LOG.isDebugEnabled()) { + FsDatasetImpl.LOG.debug("Waiting for volume reference to be released."); + } + try { + monitor.wait(sleepMillis); + } catch (InterruptedException e) { + FsDatasetImpl.LOG.info("Thread interrupted when waiting for " + + "volume reference to be released."); + Thread.currentThread().interrupt(); + } + } + FsDatasetImpl.LOG.info("Volume reference is released."); + } + @Override public String toString() { return volumes.toString(); @@ -298,12 +324,13 @@ private void removeVolume(FsVolumeImpl target) { blockScanner.removeVolumeScanner(target); } try { - target.closeAndWait(); + target.setClosed(); } catch (IOException e) { FsDatasetImpl.LOG.warn( "Error occurs when waiting volume to close: " + target, e); } target.shutdown(); + volumesBeingRemoved.add(target); FsDatasetImpl.LOG.info("Removed volume: " + target); } else { if (FsDatasetImpl.LOG.isDebugEnabled()) { @@ -336,6 +363,24 @@ VolumeFailureInfo[] getVolumeFailureInfos() { return infos.toArray(new VolumeFailureInfo[infos.size()]); } + /** + * Check whether the reference of the volume from a previous + * {@link #removeVolume(FsVolumeImpl)} call is released. + * + * @return Whether the reference is released. + */ + boolean checkVolumesRemoved() { + Iterator it = volumesBeingRemoved.iterator(); + while (it.hasNext()) { + FsVolumeImpl volume = it.next(); + if (!volume.checkClosed()) { + return false; + } + it.remove(); + } + return true; + } + void addVolumeFailureInfo(VolumeFailureInfo volumeFailureInfo) { volumeFailureInfos.put(volumeFailureInfo.getFailedStorageLocation(), volumeFailureInfo); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java index 261a8b0ed0f..22a00738736 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java @@ -19,6 +19,7 @@ import com.google.common.collect.Lists; +import org.apache.commons.io.FileUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystemTestHelper; import org.apache.hadoop.fs.Path; @@ -65,6 +66,7 @@ import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Collections; +import java.util.concurrent.CountDownLatch; import java.util.HashSet; import java.util.List; import java.util.Set; @@ -89,8 +91,11 @@ import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class TestFsDatasetImpl { + Logger LOG = LoggerFactory.getLogger(TestFsDatasetImpl.class); private static final String BASE_DIR = new FileSystemTestHelper().getTestRootDir(); private static final int NUM_INIT_VOLUMES = 2; @@ -119,6 +124,7 @@ private static void createStorageDirs(DataStorage storage, Configuration conf, List dirs = new ArrayList(); List dirStrings = new ArrayList(); + FileUtils.deleteDirectory(new File(BASE_DIR)); for (int i = 0; i < numDirs; i++) { File loc = new File(BASE_DIR + "/data" + i); dirStrings.add(new Path(loc.toString()).toUri().toString()); @@ -296,6 +302,7 @@ public void testChangeVolumeWithRunningCheckDirs() throws IOException { FsVolumeImpl volume = mock(FsVolumeImpl.class); oldVolumes.add(volume); when(volume.getBasePath()).thenReturn("data" + i); + when(volume.checkClosed()).thenReturn(true); FsVolumeReference ref = mock(FsVolumeReference.class); when(ref.getVolume()).thenReturn(volume); volumeList.addVolume(ref); @@ -541,4 +548,52 @@ private long getDfsUsedValueOfNewVolume(long cacheDfsUsed, return dfsUsed; } + + @Test(timeout = 30000) + public void testRemoveVolumeBeingWritten() throws Exception { + // Will write and remove on dn0. + final ExtendedBlock eb = new ExtendedBlock(BLOCK_POOL_IDS[0], 0); + final CountDownLatch startFinalizeLatch = new CountDownLatch(1); + final CountDownLatch brReceivedLatch = new CountDownLatch(1); + class BlockReportThread extends Thread { + public void run() { + LOG.info("Getting block report"); + dataset.getBlockReports(eb.getBlockPoolId()); + LOG.info("Successfully received block report"); + brReceivedLatch.countDown(); + } + } + + final BlockReportThread brt = new BlockReportThread(); + class ResponderThread extends Thread { + public void run() { + try (ReplicaHandler replica = dataset + .createRbw(StorageType.DEFAULT, eb, false)) { + LOG.info("createRbw finished"); + startFinalizeLatch.countDown(); + + // Slow down while we're holding the reference to the volume + Thread.sleep(1000); + dataset.finalizeBlock(eb); + LOG.info("finalizeBlock finished"); + } catch (Exception e) { + LOG.warn("Exception caught. This should not affect the test", e); + } + } + } + + ResponderThread res = new ResponderThread(); + res.start(); + startFinalizeLatch.await(); + + Set volumesToRemove = new HashSet<>(); + volumesToRemove.add( + StorageLocation.parse(dataset.getVolume(eb).getBasePath()).getFile()); + LOG.info("Removing volume " + volumesToRemove); + // Verify block report can be received during this + brt.start(); + dataset.removeVolumes(volumesToRemove, true); + LOG.info("Volumes removed"); + brReceivedLatch.await(); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsVolumeList.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsVolumeList.java index 9b9b6927b4d..e24c72541d2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsVolumeList.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsVolumeList.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl; +import com.google.common.base.Supplier; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystemTestHelper; import org.apache.hadoop.fs.StorageType; @@ -25,6 +26,7 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference; import org.apache.hadoop.hdfs.server.datanode.fsdataset.RoundRobinVolumeChoosingPolicy; import org.apache.hadoop.hdfs.server.datanode.fsdataset.VolumeChoosingPolicy; +import org.apache.hadoop.test.GenericTestUtils; import org.junit.Before; import org.junit.Test; @@ -33,9 +35,11 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.concurrent.TimeoutException; import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNull; +import static org.junit.Assert.fail; import static org.mockito.Mockito.mock; public class TestFsVolumeList { @@ -57,11 +61,11 @@ public void setUp() { blockScanner = new BlockScanner(null, blockScannerConf); } - @Test + @Test(timeout=30000) public void testGetNextVolumeWithClosedVolume() throws IOException { FsVolumeList volumeList = new FsVolumeList( Collections.emptyList(), blockScanner, blockChooser); - List volumes = new ArrayList<>(); + final List volumes = new ArrayList<>(); for (int i = 0; i < 3; i++) { File curDir = new File(baseDir, "nextvolume-" + i); curDir.mkdirs(); @@ -73,7 +77,19 @@ public void testGetNextVolumeWithClosedVolume() throws IOException { } // Close the second volume. - volumes.get(1).closeAndWait(); + volumes.get(1).setClosed(); + try { + GenericTestUtils.waitFor(new Supplier() { + @Override + public Boolean get() { + return volumes.get(1).checkClosed(); + } + }, 100, 3000); + } catch (TimeoutException e) { + fail("timed out while waiting for volume to be removed."); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + } for (int i = 0; i < 10; i++) { try (FsVolumeReference ref = volumeList.getNextVolume(StorageType.DEFAULT, 128)) { @@ -83,11 +99,11 @@ public void testGetNextVolumeWithClosedVolume() throws IOException { } } - @Test + @Test(timeout=30000) public void testCheckDirsWithClosedVolume() throws IOException { FsVolumeList volumeList = new FsVolumeList( Collections.emptyList(), blockScanner, blockChooser); - List volumes = new ArrayList<>(); + final List volumes = new ArrayList<>(); for (int i = 0; i < 3; i++) { File curDir = new File(baseDir, "volume-" + i); curDir.mkdirs(); @@ -98,12 +114,24 @@ public void testCheckDirsWithClosedVolume() throws IOException { } // Close the 2nd volume. - volumes.get(1).closeAndWait(); + volumes.get(1).setClosed(); + try { + GenericTestUtils.waitFor(new Supplier() { + @Override + public Boolean get() { + return volumes.get(1).checkClosed(); + } + }, 100, 3000); + } catch (TimeoutException e) { + fail("timed out while waiting for volume to be removed."); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + } // checkDirs() should ignore the 2nd volume since it is closed. volumeList.checkDirs(); } - @Test + @Test(timeout=30000) public void testReleaseVolumeRefIfNoBlockScanner() throws IOException { FsVolumeList volumeList = new FsVolumeList( Collections.emptyList(), null, blockChooser);