HDFS-16111. Add a configuration to RoundRobinVolumeChoosingPolicy to avoid failed volumes at datanodes. (#3175)

Change-Id: Iead25812d4073e3980893e3e76f7d2b03b57442a

Co-authored-by: Zhihai Xu <zxu@apache.org>
This commit is contained in:
zhihaixu2012 2021-07-27 19:18:44 -07:00 committed by GitHub
parent 10ba4cc892
commit ae20516ebc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 94 additions and 3 deletions

View File

@ -1112,6 +1112,12 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final long DFS_DATANODE_AVAILABLE_SPACE_VOLUME_CHOOSING_POLICY_BALANCED_SPACE_THRESHOLD_DEFAULT = 1024L * 1024L * 1024L * 10L; // 10 GB
public static final String DFS_DATANODE_AVAILABLE_SPACE_VOLUME_CHOOSING_POLICY_BALANCED_SPACE_PREFERENCE_FRACTION_KEY = "dfs.datanode.available-space-volume-choosing-policy.balanced-space-preference-fraction";
public static final float DFS_DATANODE_AVAILABLE_SPACE_VOLUME_CHOOSING_POLICY_BALANCED_SPACE_PREFERENCE_FRACTION_DEFAULT = 0.75f;
public static final String
DFS_DATANODE_ROUND_ROBIN_VOLUME_CHOOSING_POLICY_ADDITIONAL_AVAILABLE_SPACE_KEY =
"dfs.datanode.round-robin-volume-choosing-policy.additional-available-space";
public static final long
DFS_DATANODE_ROUND_ROBIN_VOLUME_CHOOSING_POLICY_ADDITIONAL_AVAILABLE_SPACE_DEFAULT =
1024L * 1024L * 1024L; // 1 GB
public static final String DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY =
HdfsClientConfigKeys.DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY;
public static final String DFS_DATANODE_STARTUP_KEY = "dfs.datanode.startup";

View File

@ -17,11 +17,16 @@
*/
package org.apache.hadoop.hdfs.server.datanode.fsdataset;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_ROUND_ROBIN_VOLUME_CHOOSING_POLICY_ADDITIONAL_AVAILABLE_SPACE_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_ROUND_ROBIN_VOLUME_CHOOSING_POLICY_ADDITIONAL_AVAILABLE_SPACE_KEY;
import java.io.IOException;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
@ -30,7 +35,7 @@
* Use fine-grained locks to synchronize volume choosing.
*/
public class RoundRobinVolumeChoosingPolicy<V extends FsVolumeSpi>
implements VolumeChoosingPolicy<V> {
implements VolumeChoosingPolicy<V>, Configurable {
public static final Logger LOG =
LoggerFactory.getLogger(RoundRobinVolumeChoosingPolicy.class);
@ -41,6 +46,9 @@ public class RoundRobinVolumeChoosingPolicy<V extends FsVolumeSpi>
// syncLocks stores the locks for each storage type.
private Object[] syncLocks;
// The required additional available space when choosing a volume.
private long additionalAvailableSpace;
public RoundRobinVolumeChoosingPolicy() {
int numStorageTypes = StorageType.values().length;
curVolumes = new int[numStorageTypes];
@ -50,6 +58,23 @@ public RoundRobinVolumeChoosingPolicy() {
}
}
@Override
public void setConf(Configuration conf) {
additionalAvailableSpace = conf.getLong(
DFS_DATANODE_ROUND_ROBIN_VOLUME_CHOOSING_POLICY_ADDITIONAL_AVAILABLE_SPACE_KEY,
DFS_DATANODE_ROUND_ROBIN_VOLUME_CHOOSING_POLICY_ADDITIONAL_AVAILABLE_SPACE_DEFAULT);
LOG.info("Round robin volume choosing policy initialized: " +
DFS_DATANODE_ROUND_ROBIN_VOLUME_CHOOSING_POLICY_ADDITIONAL_AVAILABLE_SPACE_KEY +
" = " + additionalAvailableSpace);
}
@Override
public Configuration getConf() {
// Nothing to do. Only added to fulfill the Configurable contract.
return null;
}
@Override
public V chooseVolume(final List<V> volumes, long blockSize, String storageId)
throws IOException {
@ -83,7 +108,7 @@ private V chooseVolume(final int curVolumeIndex, final List<V> volumes,
final V volume = volumes.get(curVolume);
curVolume = (curVolume + 1) % volumes.size();
long availableVolumeSize = volume.getAvailable();
if (availableVolumeSize > blockSize) {
if (availableVolumeSize > blockSize + additionalAvailableSpace) {
curVolumes[curVolumeIndex] = curVolume;
return volume;
}

View File

@ -2657,6 +2657,17 @@
</description>
</property>
<property>
<name>dfs.datanode.round-robin-volume-choosing-policy.additional-available-space</name>
<value>1073741824</value> <!-- 1 GB -->
<description>
Only used when the dfs.datanode.fsdataset.volume.choosing.policy is set to
org.apache.hadoop.hdfs.server.datanode.fsdataset.RoundRobinVolumeChoosingPolicy.
This setting controls how much additional available space (unit is byte) is needed
when choosing a volume.
</description>
</property>
<property>
<name>dfs.namenode.edits.noeditlogchannelflush</name>
<value>false</value>

View File

@ -533,6 +533,9 @@ private void initDefaultConfigurations() {
DEFAULT_DFS_NAMENODE_REDUNDANCY_CONSIDERLOAD);
this.storagesPerDatanode =
FsDatasetTestUtils.Factory.getFactory(conf).getDefaultNumOfDataDirs();
conf.setLong(DFSConfigKeys
.DFS_DATANODE_ROUND_ROBIN_VOLUME_CHOOSING_POLICY_ADDITIONAL_AVAILABLE_SPACE_KEY,
0);
}
}

View File

@ -17,10 +17,13 @@
*/
package org.apache.hadoop.hdfs.server.datanode.fsdataset;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_ROUND_ROBIN_VOLUME_CHOOSING_POLICY_ADDITIONAL_AVAILABLE_SPACE_KEY;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
import org.apache.hadoop.util.ReflectionUtils;
@ -70,7 +73,50 @@ public static void testRR(VolumeChoosingPolicy<FsVolumeSpi> policy)
// Passed.
}
}
// Test the Round-Robin block-volume choosing algorithm with
// additional available space configured.
@Test
@SuppressWarnings("unchecked")
public void testRRWithAdditionalAvailableSpace() throws Exception {
Configuration conf = new Configuration();
// Set the additional available space needed
conf.setLong(
DFS_DATANODE_ROUND_ROBIN_VOLUME_CHOOSING_POLICY_ADDITIONAL_AVAILABLE_SPACE_KEY,
100);
final RoundRobinVolumeChoosingPolicy<FsVolumeSpi> policy =
ReflectionUtils.newInstance(RoundRobinVolumeChoosingPolicy.class, conf);
testRRWithAdditionalAvailableSpace(policy);
}
public static void testRRWithAdditionalAvailableSpace(
VolumeChoosingPolicy<FsVolumeSpi> policy) throws Exception {
final List<FsVolumeSpi> volumes = new ArrayList<FsVolumeSpi>();
// First volume, with 100 bytes of space.
volumes.add(Mockito.mock(FsVolumeSpi.class));
Mockito.when(volumes.get(0).getAvailable()).thenReturn(100L);
// Second volume, with 200 bytes of space.
volumes.add(Mockito.mock(FsVolumeSpi.class));
Mockito.when(volumes.get(1).getAvailable()).thenReturn(200L);
// The first volume has only 100L space, so the policy should choose
// the second one with additional available space configured as 100L.
Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 0,
null));
Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 0,
null));
// Fail if no volume can be chosen?
try {
policy.chooseVolume(volumes, 100, null);
Assert.fail();
} catch (IOException e) {
// Passed.
}
}
// ChooseVolume should throw DiskOutOfSpaceException
// with volume and block sizes in exception message.
@Test