diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 5ca7f8a82a8..36bb60d7e52 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -1021,6 +1021,9 @@ Release 2.9.0 - UNRELEASED HDFS-9691. TestBlockManagerSafeMode#testCheckSafeMode fails intermittently. (Mingliang Liu via aajisaka) + HDFS-9608. Disk IO imbalance in HDFS with heterogeneous storages. + (Wei Zhou via wang) + Release 2.8.0 - UNRELEASED NEW FEATURES diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/AvailableSpaceVolumeChoosingPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/AvailableSpaceVolumeChoosingPolicy.java index d0d36ba1df5..39d9547c6c9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/AvailableSpaceVolumeChoosingPolicy.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/AvailableSpaceVolumeChoosingPolicy.java @@ -31,6 +31,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException; /** @@ -39,11 +40,15 @@ import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException; * new replica allocation. By default this policy prefers assigning replicas to * those volumes with more available free space, so as to over time balance the * available space of all the volumes within a DN. + * Use fine-grained locks to enable choosing volumes of different storage + * types concurrently. */ public class AvailableSpaceVolumeChoosingPolicy implements VolumeChoosingPolicy, Configurable { private static final Log LOG = LogFactory.getLog(AvailableSpaceVolumeChoosingPolicy.class); + + private Object[] syncLocks; private final Random random; @@ -52,14 +57,24 @@ public class AvailableSpaceVolumeChoosingPolicy AvailableSpaceVolumeChoosingPolicy(Random random) { this.random = random; + initLocks(); } public AvailableSpaceVolumeChoosingPolicy() { this(new Random()); + initLocks(); + } + + private void initLocks() { + int numStorageTypes = StorageType.values().length; + syncLocks = new Object[numStorageTypes]; + for (int i = 0; i < numStorageTypes; i++) { + syncLocks[i] = new Object(); + } } @Override - public synchronized void setConf(Configuration conf) { + public void setConf(Configuration conf) { balancedSpaceThreshold = conf.getLong( DFS_DATANODE_AVAILABLE_SPACE_VOLUME_CHOOSING_POLICY_BALANCED_SPACE_THRESHOLD_KEY, DFS_DATANODE_AVAILABLE_SPACE_VOLUME_CHOOSING_POLICY_BALANCED_SPACE_THRESHOLD_DEFAULT); @@ -85,7 +100,7 @@ public class AvailableSpaceVolumeChoosingPolicy } @Override - public synchronized Configuration getConf() { + public Configuration getConf() { // Nothing to do. Only added to fulfill the Configurable contract. return null; } @@ -98,12 +113,24 @@ public class AvailableSpaceVolumeChoosingPolicy new RoundRobinVolumeChoosingPolicy(); @Override - public synchronized V chooseVolume(List volumes, + public V chooseVolume(List volumes, long replicaSize) throws IOException { if (volumes.size() < 1) { throw new DiskOutOfSpaceException("No more available volumes"); } - + // As all the items in volumes are with the same storage type, + // so only need to get the storage type index of the first item in volumes + StorageType storageType = volumes.get(0).getStorageType(); + int index = storageType != null ? + storageType.ordinal() : StorageType.DEFAULT.ordinal(); + + synchronized (syncLocks[index]) { + return doChooseVolume(volumes, replicaSize); + } + } + + private V doChooseVolume(final List volumes, + long replicaSize) throws IOException { AvailableSpaceVolumeList volumesWithSpaces = new AvailableSpaceVolumeList(volumes); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/RoundRobinVolumeChoosingPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/RoundRobinVolumeChoosingPolicy.java index 19452d80500..9474b92d993 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/RoundRobinVolumeChoosingPolicy.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/RoundRobinVolumeChoosingPolicy.java @@ -22,46 +22,75 @@ import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException; /** - * Choose volumes in round-robin order. + * Choose volumes with the same storage type in round-robin order. + * Use fine-grained locks to synchronize volume choosing. */ public class RoundRobinVolumeChoosingPolicy implements VolumeChoosingPolicy { public static final Log LOG = LogFactory.getLog(RoundRobinVolumeChoosingPolicy.class); - private int curVolume = 0; + // curVolumes stores the RR counters of each storage type. + // The ordinal of storage type in org.apache.hadoop.fs.StorageType + // is used as the index to get data from the array. + private int[] curVolumes; + // syncLocks stores the locks for each storage type. + private Object[] syncLocks; + + public RoundRobinVolumeChoosingPolicy() { + int numStorageTypes = StorageType.values().length; + curVolumes = new int[numStorageTypes]; + syncLocks = new Object[numStorageTypes]; + for (int i = 0; i < numStorageTypes; i++) { + syncLocks[i] = new Object(); + } + } @Override - public synchronized V chooseVolume(final List volumes, long blockSize) + public V chooseVolume(final List volumes, long blockSize) throws IOException { - if(volumes.size() < 1) { + if (volumes.size() < 1) { throw new DiskOutOfSpaceException("No more available volumes"); } - + + // As all the items in volumes are with the same storage type, + // so only need to get the storage type index of the first item in volumes + StorageType storageType = volumes.get(0).getStorageType(); + int index = storageType != null ? + storageType.ordinal() : StorageType.DEFAULT.ordinal(); + + synchronized (syncLocks[index]) { + return chooseVolume(index, volumes, blockSize); + } + } + + private V chooseVolume(final int curVolumeIndex, final List volumes, + long blockSize) throws IOException { // since volumes could've been removed because of the failure // make sure we are not out of bounds - if(curVolume >= volumes.size()) { - curVolume = 0; - } - + int curVolume = curVolumes[curVolumeIndex] < volumes.size() + ? curVolumes[curVolumeIndex] : 0; + int startVolume = curVolume; long maxAvailable = 0; - + while (true) { final V volume = volumes.get(curVolume); curVolume = (curVolume + 1) % volumes.size(); long availableVolumeSize = volume.getAvailable(); if (availableVolumeSize > blockSize) { + curVolumes[curVolumeIndex] = curVolume; return volume; } - + if (availableVolumeSize > maxAvailable) { maxAvailable = availableVolumeSize; } - + if (curVolume == startVolume) { throw new DiskOutOfSpaceException("Out of space: " + "The volume with the most available space (=" + maxAvailable diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/TestRoundRobinVolumeChoosingPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/TestRoundRobinVolumeChoosingPolicy.java index 9818a01b694..9b3047f9499 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/TestRoundRobinVolumeChoosingPolicy.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/TestRoundRobinVolumeChoosingPolicy.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; +import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException; import org.apache.hadoop.util.ReflectionUtils; import org.junit.Assert; @@ -102,4 +103,59 @@ public class TestRoundRobinVolumeChoosingPolicy { } } + // Test Round-Robin choosing algorithm with heterogeneous storage. + @Test + public void testRRPolicyWithStorageTypes() throws Exception { + final RoundRobinVolumeChoosingPolicy policy + = new RoundRobinVolumeChoosingPolicy(); + testRRPolicyWithStorageTypes(policy); + } + + public static void testRRPolicyWithStorageTypes( + VolumeChoosingPolicy policy) throws Exception { + final List diskVolumes = new ArrayList(); + final List ssdVolumes = new ArrayList(); + + // Add two DISK volumes to diskVolumes + diskVolumes.add(Mockito.mock(FsVolumeSpi.class)); + Mockito.when(diskVolumes.get(0).getStorageType()) + .thenReturn(StorageType.DISK); + Mockito.when(diskVolumes.get(0).getAvailable()).thenReturn(100L); + diskVolumes.add(Mockito.mock(FsVolumeSpi.class)); + Mockito.when(diskVolumes.get(1).getStorageType()) + .thenReturn(StorageType.DISK); + Mockito.when(diskVolumes.get(1).getAvailable()).thenReturn(100L); + + // Add two SSD volumes to ssdVolumes + ssdVolumes.add(Mockito.mock(FsVolumeSpi.class)); + Mockito.when(ssdVolumes.get(0).getStorageType()) + .thenReturn(StorageType.SSD); + Mockito.when(ssdVolumes.get(0).getAvailable()).thenReturn(200L); + ssdVolumes.add(Mockito.mock(FsVolumeSpi.class)); + Mockito.when(ssdVolumes.get(1).getStorageType()) + .thenReturn(StorageType.SSD); + Mockito.when(ssdVolumes.get(1).getAvailable()).thenReturn(100L); + + Assert.assertEquals(diskVolumes.get(0), + policy.chooseVolume(diskVolumes, 0)); + // Independent Round-Robin for different storage type + Assert.assertEquals(ssdVolumes.get(0), + policy.chooseVolume(ssdVolumes, 0)); + // Take block size into consideration + Assert.assertEquals(ssdVolumes.get(0), + policy.chooseVolume(ssdVolumes, 150L)); + + Assert.assertEquals(diskVolumes.get(1), + policy.chooseVolume(diskVolumes, 0)); + Assert.assertEquals(diskVolumes.get(0), + policy.chooseVolume(diskVolumes, 50L)); + + try { + policy.chooseVolume(diskVolumes, 200L); + Assert.fail("Should throw an DiskOutOfSpaceException before this!"); + } catch (DiskOutOfSpaceException e) { + // Pass. + } + } + }