HDFS-1804. Add a new block-volume device choosing policy that looks at free space. Contributed by Aaron T. Myers.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1465183 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
bcbe100541
commit
a9d515aed8
|
@ -330,4 +330,14 @@ public abstract class GenericTestUtils {
|
||||||
" but got:\n" + output,
|
" but got:\n" + output,
|
||||||
Pattern.compile(pattern).matcher(output).find());
|
Pattern.compile(pattern).matcher(output).find());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static void assertValueNear(long expected, long actual, long allowedError) {
|
||||||
|
assertValueWithinRange(expected - allowedError, expected + allowedError, actual);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void assertValueWithinRange(long expectedMin, long expectedMax,
|
||||||
|
long actual) {
|
||||||
|
Assert.assertTrue("Expected " + actual + " to be in range (" + expectedMin + ","
|
||||||
|
+ expectedMax + ")", expectedMin <= actual && actual <= expectedMax);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -358,6 +358,9 @@ Release 2.0.5-beta - UNRELEASED
|
||||||
|
|
||||||
NEW FEATURES
|
NEW FEATURES
|
||||||
|
|
||||||
|
HDFS-1804. Add a new block-volume device choosing policy that looks at
|
||||||
|
free space. (atm)
|
||||||
|
|
||||||
IMPROVEMENTS
|
IMPROVEMENTS
|
||||||
|
|
||||||
HDFS-4222. NN is unresponsive and loses heartbeats from DNs when
|
HDFS-4222. NN is unresponsive and loses heartbeats from DNs when
|
||||||
|
|
|
@ -369,6 +369,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
|
||||||
public static final String DFS_DATANODE_PLUGINS_KEY = "dfs.datanode.plugins";
|
public static final String DFS_DATANODE_PLUGINS_KEY = "dfs.datanode.plugins";
|
||||||
public static final String DFS_DATANODE_FSDATASET_FACTORY_KEY = "dfs.datanode.fsdataset.factory";
|
public static final String DFS_DATANODE_FSDATASET_FACTORY_KEY = "dfs.datanode.fsdataset.factory";
|
||||||
public static final String DFS_DATANODE_FSDATASET_VOLUME_CHOOSING_POLICY_KEY = "dfs.datanode.fsdataset.volume.choosing.policy";
|
public static final String DFS_DATANODE_FSDATASET_VOLUME_CHOOSING_POLICY_KEY = "dfs.datanode.fsdataset.volume.choosing.policy";
|
||||||
|
public static final String DFS_DATANODE_FSDATASET_VOLUME_CHOOSING_BALANCED_SPACE_THRESHOLD_KEY = "dfs.datanode.available-space-volume-choosing-policy.balanced-space-threshold";
|
||||||
|
public static final long DFS_DATANODE_FSDATASET_VOLUME_CHOOSING_BALANCED_SPACE_THRESHOLD_DEFAULT = 1024L * 1024L * 1024L * 10L; // 10 GB
|
||||||
|
public static final String DFS_DATANODE_FSDATASET_VOLUME_CHOOSING_BALANCED_SPACE_PREFERENCE_PERCENT_KEY = "dfs.datanode.available-space-volume-choosing-policy.balanced-space-preference-percent";
|
||||||
|
public static final float DFS_DATANODE_FSDATASET_VOLUME_CHOOSING_BALANCED_SPACE_PREFERENCE_PERCENT_DEFAULT = 0.75f;
|
||||||
public static final String DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY = "dfs.datanode.socket.write.timeout";
|
public static final String DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY = "dfs.datanode.socket.write.timeout";
|
||||||
public static final String DFS_DATANODE_STARTUP_KEY = "dfs.datanode.startup";
|
public static final String DFS_DATANODE_STARTUP_KEY = "dfs.datanode.startup";
|
||||||
public static final String DFS_NAMENODE_PLUGINS_KEY = "dfs.namenode.plugins";
|
public static final String DFS_NAMENODE_PLUGINS_KEY = "dfs.namenode.plugins";
|
||||||
|
|
|
@ -0,0 +1,259 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hdfs.server.datanode.fsdataset;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_FSDATASET_VOLUME_CHOOSING_BALANCED_SPACE_THRESHOLD_DEFAULT;
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_FSDATASET_VOLUME_CHOOSING_BALANCED_SPACE_THRESHOLD_KEY;
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_FSDATASET_VOLUME_CHOOSING_BALANCED_SPACE_PREFERENCE_PERCENT_DEFAULT;
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_FSDATASET_VOLUME_CHOOSING_BALANCED_SPACE_PREFERENCE_PERCENT_KEY;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Random;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.conf.Configurable;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A DN volume choosing policy which takes into account the amount of free
|
||||||
|
* space on each of the available volumes when considering where to assign a
|
||||||
|
* new replica allocation. By default this policy prefers assigning replicas to
|
||||||
|
* those volumes with more available free space, so as to over time balance the
|
||||||
|
* available space of all the volumes within a DN.
|
||||||
|
*/
|
||||||
|
public class AvailableSpaceVolumeChoosingPolicy<V extends FsVolumeSpi>
|
||||||
|
implements VolumeChoosingPolicy<V>, Configurable {
|
||||||
|
|
||||||
|
private static final Log LOG = LogFactory.getLog(AvailableSpaceVolumeChoosingPolicy.class);
|
||||||
|
|
||||||
|
private static final Random RAND = new Random();
|
||||||
|
|
||||||
|
private long balancedSpaceThreshold = DFS_DATANODE_FSDATASET_VOLUME_CHOOSING_BALANCED_SPACE_THRESHOLD_DEFAULT;
|
||||||
|
private float balancedPreferencePercent = DFS_DATANODE_FSDATASET_VOLUME_CHOOSING_BALANCED_SPACE_PREFERENCE_PERCENT_DEFAULT;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public synchronized void setConf(Configuration conf) {
|
||||||
|
balancedSpaceThreshold = conf.getLong(
|
||||||
|
DFS_DATANODE_FSDATASET_VOLUME_CHOOSING_BALANCED_SPACE_THRESHOLD_KEY,
|
||||||
|
DFS_DATANODE_FSDATASET_VOLUME_CHOOSING_BALANCED_SPACE_THRESHOLD_DEFAULT);
|
||||||
|
balancedPreferencePercent = conf.getFloat(
|
||||||
|
DFS_DATANODE_FSDATASET_VOLUME_CHOOSING_BALANCED_SPACE_PREFERENCE_PERCENT_KEY,
|
||||||
|
DFS_DATANODE_FSDATASET_VOLUME_CHOOSING_BALANCED_SPACE_PREFERENCE_PERCENT_DEFAULT);
|
||||||
|
|
||||||
|
LOG.info("Available space volume choosing policy initialized: " +
|
||||||
|
DFS_DATANODE_FSDATASET_VOLUME_CHOOSING_BALANCED_SPACE_THRESHOLD_KEY +
|
||||||
|
" = " + balancedSpaceThreshold + ", " +
|
||||||
|
DFS_DATANODE_FSDATASET_VOLUME_CHOOSING_BALANCED_SPACE_PREFERENCE_PERCENT_KEY +
|
||||||
|
" = " + balancedPreferencePercent);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public synchronized Configuration getConf() {
|
||||||
|
// Nothing to do. Only added to fulfill the Configurable contract.
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
private VolumeChoosingPolicy<V> roundRobinPolicyBalanced =
|
||||||
|
new RoundRobinVolumeChoosingPolicy<V>();
|
||||||
|
private VolumeChoosingPolicy<V> roundRobinPolicyHighAvailable =
|
||||||
|
new RoundRobinVolumeChoosingPolicy<V>();
|
||||||
|
private VolumeChoosingPolicy<V> roundRobinPolicyLowAvailable =
|
||||||
|
new RoundRobinVolumeChoosingPolicy<V>();
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public synchronized V chooseVolume(List<V> volumes,
|
||||||
|
final long replicaSize) throws IOException {
|
||||||
|
if (volumes.size() < 1) {
|
||||||
|
throw new DiskOutOfSpaceException("No more available volumes");
|
||||||
|
}
|
||||||
|
|
||||||
|
AvailableSpaceVolumeList volumesWithSpaces =
|
||||||
|
new AvailableSpaceVolumeList(volumes);
|
||||||
|
|
||||||
|
if (volumesWithSpaces.areAllVolumesWithinFreeSpaceThreshold()) {
|
||||||
|
// If they're actually not too far out of whack, fall back on pure round
|
||||||
|
// robin.
|
||||||
|
V volume = roundRobinPolicyBalanced.chooseVolume(volumes, replicaSize);
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("All volumes are within the configured free space balance " +
|
||||||
|
"threshold. Selecting " + volume + " for write of block size " +
|
||||||
|
replicaSize);
|
||||||
|
}
|
||||||
|
return volume;
|
||||||
|
} else {
|
||||||
|
V volume = null;
|
||||||
|
// If none of the volumes with low free space have enough space for the
|
||||||
|
// replica, always try to choose a volume with a lot of free space.
|
||||||
|
long mostAvailableAmongLowVolumes = volumesWithSpaces
|
||||||
|
.getMostAvailableSpaceAmongVolumesWithLowAvailableSpace();
|
||||||
|
|
||||||
|
List<V> highAvailableVolumes = extractVolumesFromPairs(
|
||||||
|
volumesWithSpaces.getVolumesWithHighAvailableSpace());
|
||||||
|
List<V> lowAvailableVolumes = extractVolumesFromPairs(
|
||||||
|
volumesWithSpaces.getVolumesWithLowAvailableSpace());
|
||||||
|
|
||||||
|
float preferencePercentScaler =
|
||||||
|
(highAvailableVolumes.size() * balancedPreferencePercent) +
|
||||||
|
(lowAvailableVolumes.size() * (1 - balancedPreferencePercent));
|
||||||
|
float scaledPreferencePercent =
|
||||||
|
(highAvailableVolumes.size() * balancedPreferencePercent) /
|
||||||
|
preferencePercentScaler;
|
||||||
|
if (mostAvailableAmongLowVolumes < replicaSize ||
|
||||||
|
RAND.nextFloat() < scaledPreferencePercent) {
|
||||||
|
volume = roundRobinPolicyHighAvailable.chooseVolume(
|
||||||
|
highAvailableVolumes,
|
||||||
|
replicaSize);
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("Volumes are imbalanced. Selecting " + volume +
|
||||||
|
" from high available space volumes for write of block size "
|
||||||
|
+ replicaSize);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
volume = roundRobinPolicyLowAvailable.chooseVolume(
|
||||||
|
lowAvailableVolumes,
|
||||||
|
replicaSize);
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("Volumes are imbalanced. Selecting " + volume +
|
||||||
|
" from low available space volumes for write of block size "
|
||||||
|
+ replicaSize);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return volume;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Used to keep track of the list of volumes we're choosing from.
|
||||||
|
*/
|
||||||
|
private class AvailableSpaceVolumeList {
|
||||||
|
private final List<AvailableSpaceVolumePair> volumes;
|
||||||
|
|
||||||
|
public AvailableSpaceVolumeList(List<V> volumes) throws IOException {
|
||||||
|
this.volumes = new ArrayList<AvailableSpaceVolumePair>();
|
||||||
|
for (V volume : volumes) {
|
||||||
|
this.volumes.add(new AvailableSpaceVolumePair(volume));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Check if the available space on all the volumes is roughly equal.
|
||||||
|
*
|
||||||
|
* @param volumes the volumes to check
|
||||||
|
* @return true if all volumes' free space is within the configured threshold,
|
||||||
|
* false otherwise.
|
||||||
|
* @throws IOException
|
||||||
|
* in the event of error checking amount of available space
|
||||||
|
*/
|
||||||
|
public boolean areAllVolumesWithinFreeSpaceThreshold() {
|
||||||
|
long leastAvailable = Long.MAX_VALUE;
|
||||||
|
long mostAvailable = 0;
|
||||||
|
for (AvailableSpaceVolumePair volume : volumes) {
|
||||||
|
leastAvailable = Math.min(leastAvailable, volume.getAvailable());
|
||||||
|
mostAvailable = Math.max(mostAvailable, volume.getAvailable());
|
||||||
|
}
|
||||||
|
return (mostAvailable - leastAvailable) < balancedSpaceThreshold;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return the minimum amount of space available on a single volume,
|
||||||
|
* across all volumes.
|
||||||
|
*/
|
||||||
|
private long getLeastAvailableSpace() {
|
||||||
|
long leastAvailable = Long.MAX_VALUE;
|
||||||
|
for (AvailableSpaceVolumePair volume : volumes) {
|
||||||
|
leastAvailable = Math.min(leastAvailable, volume.getAvailable());
|
||||||
|
}
|
||||||
|
return leastAvailable;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return the maximum amount of space available across volumes with low space.
|
||||||
|
*/
|
||||||
|
public long getMostAvailableSpaceAmongVolumesWithLowAvailableSpace() {
|
||||||
|
long mostAvailable = Long.MIN_VALUE;
|
||||||
|
for (AvailableSpaceVolumePair volume : getVolumesWithLowAvailableSpace()) {
|
||||||
|
mostAvailable = Math.max(mostAvailable, volume.getAvailable());
|
||||||
|
}
|
||||||
|
return mostAvailable;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return the list of volumes with relatively low available space.
|
||||||
|
*/
|
||||||
|
public List<AvailableSpaceVolumePair> getVolumesWithLowAvailableSpace() {
|
||||||
|
long leastAvailable = getLeastAvailableSpace();
|
||||||
|
List<AvailableSpaceVolumePair> ret = new ArrayList<AvailableSpaceVolumePair>();
|
||||||
|
for (AvailableSpaceVolumePair volume : volumes) {
|
||||||
|
if (volume.getAvailable() <= leastAvailable + balancedSpaceThreshold) {
|
||||||
|
ret.add(volume);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return the list of volumes with a lot of available space.
|
||||||
|
*/
|
||||||
|
public List<AvailableSpaceVolumePair> getVolumesWithHighAvailableSpace() {
|
||||||
|
long leastAvailable = getLeastAvailableSpace();
|
||||||
|
List<AvailableSpaceVolumePair> ret = new ArrayList<AvailableSpaceVolumePair>();
|
||||||
|
for (AvailableSpaceVolumePair volume : volumes) {
|
||||||
|
if (volume.getAvailable() > leastAvailable + balancedSpaceThreshold) {
|
||||||
|
ret.add(volume);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Used so that we only check the available space on a given volume once, at
|
||||||
|
* the beginning of {@link AvailableSpaceVolumeChoosingPolicy#chooseVolume(List, long)}.
|
||||||
|
*/
|
||||||
|
private class AvailableSpaceVolumePair {
|
||||||
|
private final V volume;
|
||||||
|
private final long availableSpace;
|
||||||
|
|
||||||
|
public AvailableSpaceVolumePair(V volume) throws IOException {
|
||||||
|
this.volume = volume;
|
||||||
|
this.availableSpace = volume.getAvailable();
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getAvailable() {
|
||||||
|
return availableSpace;
|
||||||
|
}
|
||||||
|
|
||||||
|
public V getVolume() {
|
||||||
|
return volume;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private List<V> extractVolumesFromPairs(List<AvailableSpaceVolumePair> volumes) {
|
||||||
|
List<V> ret = new ArrayList<V>();
|
||||||
|
for (AvailableSpaceVolumePair volume : volumes) {
|
||||||
|
ret.add(volume.getVolume());
|
||||||
|
}
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -1231,4 +1231,32 @@
|
||||||
</description>
|
</description>
|
||||||
</property>
|
</property>
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<name>dfs.datanode.fsdataset.volume.choosing.balanced-space-threshold</name>
|
||||||
|
<value>10737418240</value> <!-- 10 GB -->
|
||||||
|
<description>
|
||||||
|
Only used when the dfs.datanode.fsdataset.volume.choosing.policy is set to
|
||||||
|
org.apache.hadoop.hdfs.server.datanode.fsdataset.AvailableSpaceVolumeChoosingPolicy.
|
||||||
|
This setting controls how much DN volumes are allowed to differ in terms of
|
||||||
|
bytes of free disk space before they are considered imbalanced. If the free
|
||||||
|
space of all the volumes are within this range of each other, the volumes
|
||||||
|
will be considered balanced and block assignments will be done on a pure
|
||||||
|
round robin basis.
|
||||||
|
</description>
|
||||||
|
</property>
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<name>dfs.datanode.fsdataset.volume.choosing.balanced-space-preference-percent</name>
|
||||||
|
<value>0.75f</value>
|
||||||
|
<description>
|
||||||
|
Only used when the dfs.datanode.fsdataset.volume.choosing.policy is set to
|
||||||
|
org.apache.hadoop.hdfs.server.datanode.fsdataset.AvailableSpaceVolumeChoosingPolicy.
|
||||||
|
This setting controls what percentage of new block allocations will be sent
|
||||||
|
to volumes with more available disk space than others. This setting should
|
||||||
|
be in the range 0.0 - 1.0, though in practice 0.5 - 1.0, since there should
|
||||||
|
be no reason to prefer that volumes with less available disk space receive
|
||||||
|
more block allocations.
|
||||||
|
</description>
|
||||||
|
</property>
|
||||||
|
|
||||||
</configuration>
|
</configuration>
|
||||||
|
|
|
@ -0,0 +1,303 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hdfs.server.datanode.fsdataset;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_FSDATASET_VOLUME_CHOOSING_BALANCED_SPACE_THRESHOLD_KEY;
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_FSDATASET_VOLUME_CHOOSING_BALANCED_SPACE_PREFERENCE_PERCENT_KEY;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
import junit.framework.Assert;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configurable;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.test.GenericTestUtils;
|
||||||
|
import org.apache.hadoop.util.ReflectionUtils;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.mockito.Mockito;
|
||||||
|
|
||||||
|
public class TestAvailableSpaceVolumeChoosingPolicy {
|
||||||
|
|
||||||
|
private static final int RANDOMIZED_ITERATIONS = 10000;
|
||||||
|
private static final float RANDOMIZED_ERROR_PERCENT = 0.05f;
|
||||||
|
private static final long RANDOMIZED_ALLOWED_ERROR = (long) (RANDOMIZED_ERROR_PERCENT * RANDOMIZED_ITERATIONS);
|
||||||
|
|
||||||
|
private static void initPolicy(VolumeChoosingPolicy<FsVolumeSpi> policy,
|
||||||
|
float preferencePercent) {
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
// Set the threshold to consider volumes imbalanced to 1MB
|
||||||
|
conf.setLong(
|
||||||
|
DFS_DATANODE_FSDATASET_VOLUME_CHOOSING_BALANCED_SPACE_THRESHOLD_KEY,
|
||||||
|
1024 * 1024); // 1MB
|
||||||
|
conf.setFloat(
|
||||||
|
DFS_DATANODE_FSDATASET_VOLUME_CHOOSING_BALANCED_SPACE_PREFERENCE_PERCENT_KEY,
|
||||||
|
preferencePercent);
|
||||||
|
((Configurable) policy).setConf(conf);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Test the Round-Robin block-volume fallback path when all volumes are within
|
||||||
|
// the threshold.
|
||||||
|
@Test(timeout=60000)
|
||||||
|
public void testRR() throws Exception {
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
final AvailableSpaceVolumeChoosingPolicy<FsVolumeSpi> policy =
|
||||||
|
ReflectionUtils.newInstance(AvailableSpaceVolumeChoosingPolicy.class, null);
|
||||||
|
initPolicy(policy, 1.0f);
|
||||||
|
TestRoundRobinVolumeChoosingPolicy.testRR(policy);
|
||||||
|
}
|
||||||
|
|
||||||
|
// ChooseVolume should throw DiskOutOfSpaceException
|
||||||
|
// with volume and block sizes in exception message.
|
||||||
|
@Test(timeout=60000)
|
||||||
|
public void testRRPolicyExceptionMessage() throws Exception {
|
||||||
|
final AvailableSpaceVolumeChoosingPolicy<FsVolumeSpi> policy
|
||||||
|
= new AvailableSpaceVolumeChoosingPolicy<FsVolumeSpi>();
|
||||||
|
initPolicy(policy, 1.0f);
|
||||||
|
TestRoundRobinVolumeChoosingPolicy.testRRPolicyExceptionMessage(policy);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout=60000)
|
||||||
|
public void testTwoUnbalancedVolumes() throws Exception {
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
final AvailableSpaceVolumeChoosingPolicy<FsVolumeSpi> policy =
|
||||||
|
ReflectionUtils.newInstance(AvailableSpaceVolumeChoosingPolicy.class, null);
|
||||||
|
initPolicy(policy, 1.0f);
|
||||||
|
|
||||||
|
List<FsVolumeSpi> volumes = new ArrayList<FsVolumeSpi>();
|
||||||
|
|
||||||
|
// First volume with 1MB free space
|
||||||
|
volumes.add(Mockito.mock(FsVolumeSpi.class));
|
||||||
|
Mockito.when(volumes.get(0).getAvailable()).thenReturn(1024L * 1024L);
|
||||||
|
|
||||||
|
// Second volume with 3MB free space, which is a difference of 2MB, more
|
||||||
|
// than the threshold of 1MB.
|
||||||
|
volumes.add(Mockito.mock(FsVolumeSpi.class));
|
||||||
|
Mockito.when(volumes.get(1).getAvailable()).thenReturn(1024L * 1024L * 3);
|
||||||
|
|
||||||
|
Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 100));
|
||||||
|
Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 100));
|
||||||
|
Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 100));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout=60000)
|
||||||
|
public void testThreeUnbalancedVolumes() throws Exception {
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
final AvailableSpaceVolumeChoosingPolicy<FsVolumeSpi> policy =
|
||||||
|
ReflectionUtils.newInstance(AvailableSpaceVolumeChoosingPolicy.class, null);
|
||||||
|
|
||||||
|
List<FsVolumeSpi> volumes = new ArrayList<FsVolumeSpi>();
|
||||||
|
|
||||||
|
// First volume with 1MB free space
|
||||||
|
volumes.add(Mockito.mock(FsVolumeSpi.class));
|
||||||
|
Mockito.when(volumes.get(0).getAvailable()).thenReturn(1024L * 1024L);
|
||||||
|
|
||||||
|
// Second volume with 3MB free space, which is a difference of 2MB, more
|
||||||
|
// than the threshold of 1MB.
|
||||||
|
volumes.add(Mockito.mock(FsVolumeSpi.class));
|
||||||
|
Mockito.when(volumes.get(1).getAvailable()).thenReturn(1024L * 1024L * 3);
|
||||||
|
|
||||||
|
// Third volume, again with 3MB free space.
|
||||||
|
volumes.add(Mockito.mock(FsVolumeSpi.class));
|
||||||
|
Mockito.when(volumes.get(2).getAvailable()).thenReturn(1024L * 1024L * 3);
|
||||||
|
|
||||||
|
// We should alternate assigning between the two volumes with a lot of free
|
||||||
|
// space.
|
||||||
|
initPolicy(policy, 1.0f);
|
||||||
|
Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 100));
|
||||||
|
Assert.assertEquals(volumes.get(2), policy.chooseVolume(volumes, 100));
|
||||||
|
Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 100));
|
||||||
|
Assert.assertEquals(volumes.get(2), policy.chooseVolume(volumes, 100));
|
||||||
|
|
||||||
|
// All writes should be assigned to the volume with the least free space.
|
||||||
|
initPolicy(policy, 0.0f);
|
||||||
|
Assert.assertEquals(volumes.get(0), policy.chooseVolume(volumes, 100));
|
||||||
|
Assert.assertEquals(volumes.get(0), policy.chooseVolume(volumes, 100));
|
||||||
|
Assert.assertEquals(volumes.get(0), policy.chooseVolume(volumes, 100));
|
||||||
|
Assert.assertEquals(volumes.get(0), policy.chooseVolume(volumes, 100));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout=60000)
|
||||||
|
public void testFourUnbalancedVolumes() throws Exception {
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
final AvailableSpaceVolumeChoosingPolicy<FsVolumeSpi> policy =
|
||||||
|
ReflectionUtils.newInstance(AvailableSpaceVolumeChoosingPolicy.class, null);
|
||||||
|
|
||||||
|
List<FsVolumeSpi> volumes = new ArrayList<FsVolumeSpi>();
|
||||||
|
|
||||||
|
// First volume with 1MB free space
|
||||||
|
volumes.add(Mockito.mock(FsVolumeSpi.class));
|
||||||
|
Mockito.when(volumes.get(0).getAvailable()).thenReturn(1024L * 1024L);
|
||||||
|
|
||||||
|
// Second volume with 1MB + 1 byte free space
|
||||||
|
volumes.add(Mockito.mock(FsVolumeSpi.class));
|
||||||
|
Mockito.when(volumes.get(1).getAvailable()).thenReturn(1024L * 1024L + 1);
|
||||||
|
|
||||||
|
// Third volume with 3MB free space, which is a difference of 2MB, more
|
||||||
|
// than the threshold of 1MB.
|
||||||
|
volumes.add(Mockito.mock(FsVolumeSpi.class));
|
||||||
|
Mockito.when(volumes.get(2).getAvailable()).thenReturn(1024L * 1024L * 3);
|
||||||
|
|
||||||
|
// Fourth volume, again with 3MB free space.
|
||||||
|
volumes.add(Mockito.mock(FsVolumeSpi.class));
|
||||||
|
Mockito.when(volumes.get(3).getAvailable()).thenReturn(1024L * 1024L * 3);
|
||||||
|
|
||||||
|
// We should alternate assigning between the two volumes with a lot of free
|
||||||
|
// space.
|
||||||
|
initPolicy(policy, 1.0f);
|
||||||
|
Assert.assertEquals(volumes.get(2), policy.chooseVolume(volumes, 100));
|
||||||
|
Assert.assertEquals(volumes.get(3), policy.chooseVolume(volumes, 100));
|
||||||
|
Assert.assertEquals(volumes.get(2), policy.chooseVolume(volumes, 100));
|
||||||
|
Assert.assertEquals(volumes.get(3), policy.chooseVolume(volumes, 100));
|
||||||
|
|
||||||
|
// We should alternate assigning between the two volumes with less free
|
||||||
|
// space.
|
||||||
|
initPolicy(policy, 0.0f);
|
||||||
|
Assert.assertEquals(volumes.get(0), policy.chooseVolume(volumes, 100));
|
||||||
|
Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 100));
|
||||||
|
Assert.assertEquals(volumes.get(0), policy.chooseVolume(volumes, 100));
|
||||||
|
Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 100));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout=60000)
|
||||||
|
public void testNotEnoughSpaceOnSelectedVolume() throws Exception {
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
final AvailableSpaceVolumeChoosingPolicy<FsVolumeSpi> policy =
|
||||||
|
ReflectionUtils.newInstance(AvailableSpaceVolumeChoosingPolicy.class, null);
|
||||||
|
|
||||||
|
List<FsVolumeSpi> volumes = new ArrayList<FsVolumeSpi>();
|
||||||
|
|
||||||
|
// First volume with 1MB free space
|
||||||
|
volumes.add(Mockito.mock(FsVolumeSpi.class));
|
||||||
|
Mockito.when(volumes.get(0).getAvailable()).thenReturn(1024L * 1024L);
|
||||||
|
|
||||||
|
// Second volume with 3MB free space, which is a difference of 2MB, more
|
||||||
|
// than the threshold of 1MB.
|
||||||
|
volumes.add(Mockito.mock(FsVolumeSpi.class));
|
||||||
|
Mockito.when(volumes.get(1).getAvailable()).thenReturn(1024L * 1024L * 3);
|
||||||
|
|
||||||
|
// All writes should be assigned to the volume with the least free space.
|
||||||
|
// However, if the volume with the least free space doesn't have enough
|
||||||
|
// space to accept the replica size, and another volume does have enough
|
||||||
|
// free space, that should be chosen instead.
|
||||||
|
initPolicy(policy, 0.0f);
|
||||||
|
Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 1024L * 1024L * 2));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout=60000)
|
||||||
|
public void testAvailableSpaceChanges() throws Exception {
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
final AvailableSpaceVolumeChoosingPolicy<FsVolumeSpi> policy =
|
||||||
|
ReflectionUtils.newInstance(AvailableSpaceVolumeChoosingPolicy.class, null);
|
||||||
|
initPolicy(policy, 1.0f);
|
||||||
|
|
||||||
|
List<FsVolumeSpi> volumes = new ArrayList<FsVolumeSpi>();
|
||||||
|
|
||||||
|
// First volume with 1MB free space
|
||||||
|
volumes.add(Mockito.mock(FsVolumeSpi.class));
|
||||||
|
Mockito.when(volumes.get(0).getAvailable()).thenReturn(1024L * 1024L);
|
||||||
|
|
||||||
|
// Second volume with 3MB free space, which is a difference of 2MB, more
|
||||||
|
// than the threshold of 1MB.
|
||||||
|
volumes.add(Mockito.mock(FsVolumeSpi.class));
|
||||||
|
Mockito.when(volumes.get(1).getAvailable())
|
||||||
|
.thenReturn(1024L * 1024L * 3)
|
||||||
|
.thenReturn(1024L * 1024L * 3)
|
||||||
|
.thenReturn(1024L * 1024L * 3)
|
||||||
|
.thenReturn(1024L * 1024L * 1); // After the third check, return 1MB.
|
||||||
|
|
||||||
|
// Should still be able to get a volume for the replica even though the
|
||||||
|
// available space on the second volume changed.
|
||||||
|
Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 100));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout=60000)
|
||||||
|
public void randomizedTest1() throws Exception {
|
||||||
|
doRandomizedTest(0.75f, 1, 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout=60000)
|
||||||
|
public void randomizedTest2() throws Exception {
|
||||||
|
doRandomizedTest(0.75f, 5, 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout=60000)
|
||||||
|
public void randomizedTest3() throws Exception {
|
||||||
|
doRandomizedTest(0.75f, 1, 5);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout=60000)
|
||||||
|
public void randomizedTest4() throws Exception {
|
||||||
|
doRandomizedTest(0.90f, 5, 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Ensure that we randomly select the lesser-used volumes with appropriate
|
||||||
|
* frequency.
|
||||||
|
*/
|
||||||
|
public void doRandomizedTest(float preferencePercent, int lowSpaceVolumes,
|
||||||
|
int highSpaceVolumes) throws Exception {
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
final AvailableSpaceVolumeChoosingPolicy<FsVolumeSpi> policy =
|
||||||
|
ReflectionUtils.newInstance(AvailableSpaceVolumeChoosingPolicy.class, null);
|
||||||
|
|
||||||
|
List<FsVolumeSpi> volumes = new ArrayList<FsVolumeSpi>();
|
||||||
|
|
||||||
|
// Volumes with 1MB free space
|
||||||
|
for (int i = 0; i < lowSpaceVolumes; i++) {
|
||||||
|
FsVolumeSpi volume = Mockito.mock(FsVolumeSpi.class);
|
||||||
|
Mockito.when(volume.getAvailable()).thenReturn(1024L * 1024L);
|
||||||
|
volumes.add(volume);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Volumes with 3MB free space
|
||||||
|
for (int i = 0; i < highSpaceVolumes; i++) {
|
||||||
|
FsVolumeSpi volume = Mockito.mock(FsVolumeSpi.class);
|
||||||
|
Mockito.when(volume.getAvailable()).thenReturn(1024L * 1024L * 3);
|
||||||
|
volumes.add(volume);
|
||||||
|
}
|
||||||
|
|
||||||
|
initPolicy(policy, preferencePercent);
|
||||||
|
long lowAvailableSpaceVolumeSelected = 0;
|
||||||
|
long highAvailableSpaceVolumeSelected = 0;
|
||||||
|
for (int i = 0; i < RANDOMIZED_ITERATIONS; i++) {
|
||||||
|
FsVolumeSpi volume = policy.chooseVolume(volumes, 100);
|
||||||
|
for (int j = 0; j < volumes.size(); j++) {
|
||||||
|
// Note how many times the first low available volume was selected
|
||||||
|
if (volume == volumes.get(j) && j == 0) {
|
||||||
|
lowAvailableSpaceVolumeSelected++;
|
||||||
|
}
|
||||||
|
// Note how many times the first high available volume was selected
|
||||||
|
if (volume == volumes.get(j) && j == lowSpaceVolumes) {
|
||||||
|
highAvailableSpaceVolumeSelected++;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Calculate the expected ratio of how often low available space volumes
|
||||||
|
// were selected vs. high available space volumes.
|
||||||
|
float expectedSelectionRatio = preferencePercent / (1 - preferencePercent);
|
||||||
|
|
||||||
|
GenericTestUtils.assertValueNear(
|
||||||
|
(long)(lowAvailableSpaceVolumeSelected * expectedSelectionRatio),
|
||||||
|
highAvailableSpaceVolumeSelected,
|
||||||
|
RANDOMIZED_ALLOWED_ERROR);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -32,6 +32,14 @@ public class TestRoundRobinVolumeChoosingPolicy {
|
||||||
// Test the Round-Robin block-volume choosing algorithm.
|
// Test the Round-Robin block-volume choosing algorithm.
|
||||||
@Test
|
@Test
|
||||||
public void testRR() throws Exception {
|
public void testRR() throws Exception {
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
final RoundRobinVolumeChoosingPolicy<FsVolumeSpi> policy =
|
||||||
|
ReflectionUtils.newInstance(RoundRobinVolumeChoosingPolicy.class, null);
|
||||||
|
testRR(policy);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void testRR(VolumeChoosingPolicy<FsVolumeSpi> policy)
|
||||||
|
throws Exception {
|
||||||
final List<FsVolumeSpi> volumes = new ArrayList<FsVolumeSpi>();
|
final List<FsVolumeSpi> volumes = new ArrayList<FsVolumeSpi>();
|
||||||
|
|
||||||
// First volume, with 100 bytes of space.
|
// First volume, with 100 bytes of space.
|
||||||
|
@ -42,10 +50,6 @@ public class TestRoundRobinVolumeChoosingPolicy {
|
||||||
volumes.add(Mockito.mock(FsVolumeSpi.class));
|
volumes.add(Mockito.mock(FsVolumeSpi.class));
|
||||||
Mockito.when(volumes.get(1).getAvailable()).thenReturn(200L);
|
Mockito.when(volumes.get(1).getAvailable()).thenReturn(200L);
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
|
||||||
final RoundRobinVolumeChoosingPolicy<FsVolumeSpi> policy =
|
|
||||||
ReflectionUtils.newInstance(RoundRobinVolumeChoosingPolicy.class, null);
|
|
||||||
|
|
||||||
// Test two rounds of round-robin choosing
|
// Test two rounds of round-robin choosing
|
||||||
Assert.assertEquals(volumes.get(0), policy.chooseVolume(volumes, 0));
|
Assert.assertEquals(volumes.get(0), policy.chooseVolume(volumes, 0));
|
||||||
Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 0));
|
Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 0));
|
||||||
|
@ -69,6 +73,13 @@ public class TestRoundRobinVolumeChoosingPolicy {
|
||||||
// with volume and block sizes in exception message.
|
// with volume and block sizes in exception message.
|
||||||
@Test
|
@Test
|
||||||
public void testRRPolicyExceptionMessage() throws Exception {
|
public void testRRPolicyExceptionMessage() throws Exception {
|
||||||
|
final RoundRobinVolumeChoosingPolicy<FsVolumeSpi> policy
|
||||||
|
= new RoundRobinVolumeChoosingPolicy<FsVolumeSpi>();
|
||||||
|
testRRPolicyExceptionMessage(policy);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void testRRPolicyExceptionMessage(
|
||||||
|
VolumeChoosingPolicy<FsVolumeSpi> policy) throws Exception {
|
||||||
final List<FsVolumeSpi> volumes = new ArrayList<FsVolumeSpi>();
|
final List<FsVolumeSpi> volumes = new ArrayList<FsVolumeSpi>();
|
||||||
|
|
||||||
// First volume, with 500 bytes of space.
|
// First volume, with 500 bytes of space.
|
||||||
|
@ -79,8 +90,6 @@ public class TestRoundRobinVolumeChoosingPolicy {
|
||||||
volumes.add(Mockito.mock(FsVolumeSpi.class));
|
volumes.add(Mockito.mock(FsVolumeSpi.class));
|
||||||
Mockito.when(volumes.get(1).getAvailable()).thenReturn(600L);
|
Mockito.when(volumes.get(1).getAvailable()).thenReturn(600L);
|
||||||
|
|
||||||
final RoundRobinVolumeChoosingPolicy<FsVolumeSpi> policy
|
|
||||||
= new RoundRobinVolumeChoosingPolicy<FsVolumeSpi>();
|
|
||||||
int blockSize = 700;
|
int blockSize = 700;
|
||||||
try {
|
try {
|
||||||
policy.chooseVolume(volumes, blockSize);
|
policy.chooseVolume(volumes, blockSize);
|
||||||
|
|
Loading…
Reference in New Issue