HDFS-9608. Disk IO imbalance in HDFS with heterogeneous storages. Contributed by Wei Zhou.

(cherry picked from commit 3a23dc683c)
This commit is contained in:
Andrew Wang 2016-02-17 11:29:10 -08:00
parent e7d1507e21
commit c22aedac68
4 changed files with 131 additions and 16 deletions

View File

@ -83,6 +83,9 @@ Release 2.9.0 - UNRELEASED
HDFS-9691. TestBlockManagerSafeMode#testCheckSafeMode fails intermittently.
(Mingliang Liu via aajisaka)
HDFS-9608. Disk IO imbalance in HDFS with heterogeneous storages.
(Wei Zhou via wang)
Release 2.8.0 - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -31,6 +31,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
/**
@ -39,12 +40,16 @@ import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
* new replica allocation. By default this policy prefers assigning replicas to
* those volumes with more available free space, so as to over time balance the
* available space of all the volumes within a DN.
* Use fine-grained locks to enable choosing volumes of different storage
* types concurrently.
*/
public class AvailableSpaceVolumeChoosingPolicy<V extends FsVolumeSpi>
implements VolumeChoosingPolicy<V>, Configurable {
private static final Log LOG = LogFactory.getLog(AvailableSpaceVolumeChoosingPolicy.class);
private Object[] syncLocks;
private final Random random;
private long balancedSpaceThreshold = DFS_DATANODE_AVAILABLE_SPACE_VOLUME_CHOOSING_POLICY_BALANCED_SPACE_THRESHOLD_DEFAULT;
@ -52,14 +57,24 @@ public class AvailableSpaceVolumeChoosingPolicy<V extends FsVolumeSpi>
AvailableSpaceVolumeChoosingPolicy(Random random) {
this.random = random;
initLocks();
}
public AvailableSpaceVolumeChoosingPolicy() {
this(new Random());
initLocks();
}
private void initLocks() {
int numStorageTypes = StorageType.values().length;
syncLocks = new Object[numStorageTypes];
for (int i = 0; i < numStorageTypes; i++) {
syncLocks[i] = new Object();
}
}
@Override
public synchronized void setConf(Configuration conf) {
public void setConf(Configuration conf) {
balancedSpaceThreshold = conf.getLong(
DFS_DATANODE_AVAILABLE_SPACE_VOLUME_CHOOSING_POLICY_BALANCED_SPACE_THRESHOLD_KEY,
DFS_DATANODE_AVAILABLE_SPACE_VOLUME_CHOOSING_POLICY_BALANCED_SPACE_THRESHOLD_DEFAULT);
@ -85,7 +100,7 @@ public class AvailableSpaceVolumeChoosingPolicy<V extends FsVolumeSpi>
}
@Override
public synchronized Configuration getConf() {
public Configuration getConf() {
// Nothing to do. Only added to fulfill the Configurable contract.
return null;
}
@ -98,12 +113,24 @@ public class AvailableSpaceVolumeChoosingPolicy<V extends FsVolumeSpi>
new RoundRobinVolumeChoosingPolicy<V>();
@Override
public synchronized V chooseVolume(List<V> volumes,
public V chooseVolume(List<V> volumes,
long replicaSize) throws IOException {
if (volumes.size() < 1) {
throw new DiskOutOfSpaceException("No more available volumes");
}
// As all the items in volumes are with the same storage type,
// so only need to get the storage type index of the first item in volumes
StorageType storageType = volumes.get(0).getStorageType();
int index = storageType != null ?
storageType.ordinal() : StorageType.DEFAULT.ordinal();
synchronized (syncLocks[index]) {
return doChooseVolume(volumes, replicaSize);
}
}
private V doChooseVolume(final List<V> volumes,
long replicaSize) throws IOException {
AvailableSpaceVolumeList volumesWithSpaces =
new AvailableSpaceVolumeList(volumes);

View File

@ -22,30 +22,58 @@ import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
/**
* Choose volumes in round-robin order.
* Choose volumes with the same storage type in round-robin order.
* Use fine-grained locks to synchronize volume choosing.
*/
public class RoundRobinVolumeChoosingPolicy<V extends FsVolumeSpi>
implements VolumeChoosingPolicy<V> {
public static final Log LOG = LogFactory.getLog(RoundRobinVolumeChoosingPolicy.class);
private int curVolume = 0;
// curVolumes stores the RR counters of each storage type.
// The ordinal of storage type in org.apache.hadoop.fs.StorageType
// is used as the index to get data from the array.
private int[] curVolumes;
// syncLocks stores the locks for each storage type.
private Object[] syncLocks;
public RoundRobinVolumeChoosingPolicy() {
int numStorageTypes = StorageType.values().length;
curVolumes = new int[numStorageTypes];
syncLocks = new Object[numStorageTypes];
for (int i = 0; i < numStorageTypes; i++) {
syncLocks[i] = new Object();
}
}
@Override
public synchronized V chooseVolume(final List<V> volumes, long blockSize)
public V chooseVolume(final List<V> volumes, long blockSize)
throws IOException {
if (volumes.size() < 1) {
throw new DiskOutOfSpaceException("No more available volumes");
}
// As all the items in volumes are with the same storage type,
// so only need to get the storage type index of the first item in volumes
StorageType storageType = volumes.get(0).getStorageType();
int index = storageType != null ?
storageType.ordinal() : StorageType.DEFAULT.ordinal();
synchronized (syncLocks[index]) {
return chooseVolume(index, volumes, blockSize);
}
}
private V chooseVolume(final int curVolumeIndex, final List<V> volumes,
long blockSize) throws IOException {
// since volumes could've been removed because of the failure
// make sure we are not out of bounds
if(curVolume >= volumes.size()) {
curVolume = 0;
}
int curVolume = curVolumes[curVolumeIndex] < volumes.size()
? curVolumes[curVolumeIndex] : 0;
int startVolume = curVolume;
long maxAvailable = 0;
@ -55,6 +83,7 @@ public class RoundRobinVolumeChoosingPolicy<V extends FsVolumeSpi>
curVolume = (curVolume + 1) % volumes.size();
long availableVolumeSize = volume.getAvailable();
if (availableVolumeSize > blockSize) {
curVolumes[curVolumeIndex] = curVolume;
return volume;
}

View File

@ -21,6 +21,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
import org.apache.hadoop.util.ReflectionUtils;
import org.junit.Assert;
@ -102,4 +103,59 @@ public class TestRoundRobinVolumeChoosingPolicy {
}
}
// Test Round-Robin choosing algorithm with heterogeneous storage.
@Test
public void testRRPolicyWithStorageTypes() throws Exception {
final RoundRobinVolumeChoosingPolicy<FsVolumeSpi> policy
= new RoundRobinVolumeChoosingPolicy<FsVolumeSpi>();
testRRPolicyWithStorageTypes(policy);
}
public static void testRRPolicyWithStorageTypes(
VolumeChoosingPolicy<FsVolumeSpi> policy) throws Exception {
final List<FsVolumeSpi> diskVolumes = new ArrayList<FsVolumeSpi>();
final List<FsVolumeSpi> ssdVolumes = new ArrayList<FsVolumeSpi>();
// Add two DISK volumes to diskVolumes
diskVolumes.add(Mockito.mock(FsVolumeSpi.class));
Mockito.when(diskVolumes.get(0).getStorageType())
.thenReturn(StorageType.DISK);
Mockito.when(diskVolumes.get(0).getAvailable()).thenReturn(100L);
diskVolumes.add(Mockito.mock(FsVolumeSpi.class));
Mockito.when(diskVolumes.get(1).getStorageType())
.thenReturn(StorageType.DISK);
Mockito.when(diskVolumes.get(1).getAvailable()).thenReturn(100L);
// Add two SSD volumes to ssdVolumes
ssdVolumes.add(Mockito.mock(FsVolumeSpi.class));
Mockito.when(ssdVolumes.get(0).getStorageType())
.thenReturn(StorageType.SSD);
Mockito.when(ssdVolumes.get(0).getAvailable()).thenReturn(200L);
ssdVolumes.add(Mockito.mock(FsVolumeSpi.class));
Mockito.when(ssdVolumes.get(1).getStorageType())
.thenReturn(StorageType.SSD);
Mockito.when(ssdVolumes.get(1).getAvailable()).thenReturn(100L);
Assert.assertEquals(diskVolumes.get(0),
policy.chooseVolume(diskVolumes, 0));
// Independent Round-Robin for different storage type
Assert.assertEquals(ssdVolumes.get(0),
policy.chooseVolume(ssdVolumes, 0));
// Take block size into consideration
Assert.assertEquals(ssdVolumes.get(0),
policy.chooseVolume(ssdVolumes, 150L));
Assert.assertEquals(diskVolumes.get(1),
policy.chooseVolume(diskVolumes, 0));
Assert.assertEquals(diskVolumes.get(0),
policy.chooseVolume(diskVolumes, 50L));
try {
policy.chooseVolume(diskVolumes, 200L);
Assert.fail("Should throw an DiskOutOfSpaceException before this!");
} catch (DiskOutOfSpaceException e) {
// Pass.
}
}
}