diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/AutoCloseableLock.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/AutoCloseableLock.java index d920bc63c16..d7fe93d73cf 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/AutoCloseableLock.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/AutoCloseableLock.java @@ -18,6 +18,7 @@ package org.apache.hadoop.util; import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; import com.google.common.annotations.VisibleForTesting; @@ -135,4 +136,11 @@ public class AutoCloseableLock implements AutoCloseable { throw new UnsupportedOperationException(); } + /** + * See {@link ReentrantLock#newCondition()}. + * @return the Condition object + */ + public Condition newCondition() { + return lock.newCondition(); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java index 5b3ebceb488..a289f9ef356 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java @@ -42,6 +42,7 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.*; import java.util.concurrent.Executor; +import java.util.concurrent.locks.Condition; import java.util.concurrent.TimeUnit; import javax.management.NotCompliantMBeanException; @@ -273,6 +274,8 @@ class FsDatasetImpl implements FsDatasetSpi { private final int maxDataLength; private final AutoCloseableLock datasetLock; + private final Condition datasetLockCondition; + /** * An FSDataset has a directory where it loads its data files. */ @@ -290,6 +293,8 @@ class FsDatasetImpl implements FsDatasetSpi { DFSConfigKeys.DFS_LOCK_SUPPRESS_WARNING_INTERVAL_DEFAULT, TimeUnit.MILLISECONDS), 300)); + this.datasetLockCondition = datasetLock.newCondition(); + // The number of volumes required for operation is the total number // of volumes minus the number of failed volumes we can tolerate. volFailuresTolerated = datanode.getDnConf().getVolFailuresTolerated(); @@ -521,7 +526,7 @@ class FsDatasetImpl implements FsDatasetSpi { // Disable the volume from the service. asyncDiskService.removeVolume(sd.getCurrentDir()); volumes.removeVolume(absRoot, clearFailure); - volumes.waitVolumeRemoved(5000, this); + volumes.waitVolumeRemoved(5000, datasetLockCondition); // Removed all replica information for the blocks on the volume. // Unlike updating the volumeMap in addVolume(), this operation does diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java index ea4d5975cd0..634ad42d89c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java @@ -31,6 +31,8 @@ import java.util.Map; import java.util.TreeMap; import java.util.Set; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.StorageType; @@ -41,6 +43,7 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.VolumeChoosingPolicy; import org.apache.hadoop.hdfs.server.datanode.BlockScanner; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.util.AutoCloseableLock; import org.apache.hadoop.util.DiskChecker.DiskErrorException; import org.apache.hadoop.util.Time; @@ -52,7 +55,8 @@ class FsVolumeList { Collections.synchronizedMap(new TreeMap()); private final ConcurrentLinkedQueue volumesBeingRemoved = new ConcurrentLinkedQueue<>(); - private Object checkDirsMutex = new Object(); + private final AutoCloseableLock checkDirsLock; + private final Condition checkDirsLockCondition; private final VolumeChoosingPolicy blockChooser; private final BlockScanner blockScanner; @@ -62,6 +66,8 @@ class FsVolumeList { VolumeChoosingPolicy blockChooser) { this.blockChooser = blockChooser; this.blockScanner = blockScanner; + this.checkDirsLock = new AutoCloseableLock(); + this.checkDirsLockCondition = checkDirsLock.newCondition(); for (VolumeFailureInfo volumeFailureInfo: initialVolumeFailureInfos) { volumeFailureInfos.put(volumeFailureInfo.getFailedStorageLocation(), volumeFailureInfo); @@ -224,12 +230,12 @@ class FsVolumeList { /** * Calls {@link FsVolumeImpl#checkDirs()} on each volume. * - * Use checkDirsMutext to allow only one instance of checkDirs() call + * Use {@link checkDirsLock} to allow only one instance of checkDirs() call. * * @return list of all the failed volumes. */ Set checkDirs() { - synchronized(checkDirsMutex) { + try (AutoCloseableLock lock = checkDirsLock.acquire()) { Set failedVols = null; // Make a copy of volumes for performing modification @@ -260,7 +266,7 @@ class FsVolumeList { + " failure volumes."); } - waitVolumeRemoved(5000, checkDirsMutex); + waitVolumeRemoved(5000, checkDirsLockCondition); return failedVols; } } @@ -271,13 +277,13 @@ class FsVolumeList { * * @param sleepMillis interval to recheck. */ - void waitVolumeRemoved(int sleepMillis, Object monitor) { + void waitVolumeRemoved(int sleepMillis, Condition condition) { while (!checkVolumesRemoved()) { if (FsDatasetImpl.LOG.isDebugEnabled()) { FsDatasetImpl.LOG.debug("Waiting for volume reference to be released."); } try { - monitor.wait(sleepMillis); + condition.await(sleepMillis, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { FsDatasetImpl.LOG.info("Thread interrupted when waiting for " + "volume reference to be released."); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java index 69349fc469c..3f39c8790e4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java @@ -639,11 +639,11 @@ public class TestFsDatasetImpl { Set volumesToRemove = new HashSet<>(); try { volumesToRemove.add(StorageLocation.parse( - dataset.getVolume(eb).getBasePath()).getFile()); + dataset.getVolume(eb).getBasePath()).getFile()); } catch (Exception e) { - LOG.info("Problem preparing volumes to remove: " + e); + LOG.info("Problem preparing volumes to remove: ", e); Assert.fail("Exception in remove volume thread, check log for " + - "details."); + "details."); } LOG.info("Removing volume " + volumesToRemove); dataset.removeVolumes(volumesToRemove, true);