HDFS-12482. Provide a configuration to adjust the weight of EC recovery tasks to adjust the speed of recovery. (lei)

This commit is contained in:
Lei Xu 2017-10-31 21:58:14 -07:00
parent ed24da3dd7
commit 9367c25dbd
7 changed files with 106 additions and 1 deletions

View File

@ -596,6 +596,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final int DFS_DN_EC_RECONSTRUCTION_STRIPED_READ_TIMEOUT_MILLIS_DEFAULT = 5000; //5s public static final int DFS_DN_EC_RECONSTRUCTION_STRIPED_READ_TIMEOUT_MILLIS_DEFAULT = 5000; //5s
public static final String DFS_DN_EC_RECONSTRUCTION_THREADS_KEY = "dfs.datanode.ec.reconstruction.threads"; public static final String DFS_DN_EC_RECONSTRUCTION_THREADS_KEY = "dfs.datanode.ec.reconstruction.threads";
public static final int DFS_DN_EC_RECONSTRUCTION_THREADS_DEFAULT = 8; public static final int DFS_DN_EC_RECONSTRUCTION_THREADS_DEFAULT = 8;
public static final String DFS_DN_EC_RECONSTRUCTION_XMITS_WEIGHT_KEY =
"dfs.datanode.ec.reconstruction.xmits.weight";
public static final float DFS_DN_EC_RECONSTRUCTION_XMITS_WEIGHT_DEFAULT =
0.5f;
public static final String public static final String
DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_KEY = DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_KEY =

View File

@ -89,4 +89,10 @@ public class DataNodeFaultInjector {
public void throwTooManyOpenFiles() throws FileNotFoundException { public void throwTooManyOpenFiles() throws FileNotFoundException {
} }
/**
* Used as a hook to inject failure in erasure coding reconstruction
* process.
*/
public void stripedBlockReconstruction() throws IOException {}
} }

View File

@ -17,6 +17,7 @@
*/ */
package org.apache.hadoop.hdfs.server.datanode.erasurecode; package org.apache.hadoop.hdfs.server.datanode.erasurecode;
import com.google.common.base.Preconditions;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
@ -47,6 +48,7 @@ public final class ErasureCodingWorker {
private final DataNode datanode; private final DataNode datanode;
private final Configuration conf; private final Configuration conf;
private final float xmitWeight;
private ThreadPoolExecutor stripedReconstructionPool; private ThreadPoolExecutor stripedReconstructionPool;
private ThreadPoolExecutor stripedReadPool; private ThreadPoolExecutor stripedReadPool;
@ -54,6 +56,14 @@ public final class ErasureCodingWorker {
public ErasureCodingWorker(Configuration conf, DataNode datanode) { public ErasureCodingWorker(Configuration conf, DataNode datanode) {
this.datanode = datanode; this.datanode = datanode;
this.conf = conf; this.conf = conf;
this.xmitWeight = conf.getFloat(
DFSConfigKeys.DFS_DN_EC_RECONSTRUCTION_XMITS_WEIGHT_KEY,
DFSConfigKeys.DFS_DN_EC_RECONSTRUCTION_XMITS_WEIGHT_DEFAULT
);
Preconditions.checkArgument(this.xmitWeight >= 0,
"Invalid value configured for " +
DFSConfigKeys.DFS_DN_EC_RECONSTRUCTION_XMITS_WEIGHT_KEY +
", it can not be negative value (" + this.xmitWeight + ").");
initializeStripedReadThreadPool(); initializeStripedReadThreadPool();
initializeStripedBlkReconstructionThreadPool(conf.getInt( initializeStripedBlkReconstructionThreadPool(conf.getInt(
@ -128,7 +138,7 @@ public final class ErasureCodingWorker {
// 1) NN will not send more tasks than what DN can execute and // 1) NN will not send more tasks than what DN can execute and
// 2) DN will not throw away reconstruction tasks, and instead keeps // 2) DN will not throw away reconstruction tasks, and instead keeps
// an unbounded number of tasks in the executor's task queue. // an unbounded number of tasks in the executor's task queue.
xmitsSubmitted = task.getXmits(); xmitsSubmitted = Math.max((int)(task.getXmits() * xmitWeight), 1);
getDatanode().incrementXmitsInProcess(xmitsSubmitted); getDatanode().incrementXmitsInProcess(xmitsSubmitted);
stripedReconstructionPool.submit(task); stripedReconstructionPool.submit(task);
} else { } else {

View File

@ -21,6 +21,7 @@ import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.server.datanode.DataNodeFaultInjector;
import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics; import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
import org.apache.hadoop.util.Time; import org.apache.hadoop.util.Time;
@ -80,6 +81,7 @@ class StripedBlockReconstructor extends StripedReconstructor
void reconstruct() throws IOException { void reconstruct() throws IOException {
while (getPositionInBlock() < getMaxTargetLength()) { while (getPositionInBlock() < getMaxTargetLength()) {
DataNodeFaultInjector.get().stripedBlockReconstruction();
long remaining = getMaxTargetLength() - getPositionInBlock(); long remaining = getMaxTargetLength() - getPositionInBlock();
final int toReconstructLen = final int toReconstructLen =
(int) Math.min(getStripedReader().getBufferSize(), remaining); (int) Math.min(getStripedReader().getBufferSize(), remaining);

View File

@ -3072,6 +3072,19 @@
</description> </description>
</property> </property>
<property>
<name>dfs.datanode.ec.reconstruction.xmits.weight</name>
<value>0.5</value>
<description>
Datanode uses xmits weight to calculate the relative cost of EC recovery
tasks comparing to replicated block recovery, of which xmits is always 1.
Namenode then uses xmits reported from datanode to throttle recovery tasks
for EC and replicated blocks.
The xmits of an erasure coding recovery task is calculated as the maximum
value between the number of read streams and the number of write streams.
</description>
</property>
<property> <property>
<name>dfs.namenode.quota.init-threads</name> <name>dfs.namenode.quota.init-threads</name>
<value>4</value> <value>4</value>

View File

@ -136,6 +136,12 @@ Deployment
1. `dfs.datanode.ec.reconstruction.stripedread.timeout.millis` - Timeout for striped reads. Default value is 5000 ms. 1. `dfs.datanode.ec.reconstruction.stripedread.timeout.millis` - Timeout for striped reads. Default value is 5000 ms.
1. `dfs.datanode.ec.reconstruction.stripedread.buffer.size` - Buffer size for reader service. Default value is 64KB. 1. `dfs.datanode.ec.reconstruction.stripedread.buffer.size` - Buffer size for reader service. Default value is 64KB.
1. `dfs.datanode.ec.reconstruction.threads` - Number of threads used by the Datanode for background reconstruction work. Default value is 8 threads. 1. `dfs.datanode.ec.reconstruction.threads` - Number of threads used by the Datanode for background reconstruction work. Default value is 8 threads.
1. `dfs.datanode.ec.reconstruction.xmits.weight` - Relative weight of xmits used by EC background recovery task comparing to replicated block recovery. Default value is 0.5.
It sets to `0` to disable calculate weights for EC recovery tasks, that is, EC task always has `1` xmits.
The xmits of an erasure coding recovery task is calculated as the maximum value between the number of read streams and the number of write streams. For example, if an EC recovery
task need to read from 6 nodes and write to 2 nodes, it has xmits of `max(6, 2) * 0.5 = 3`. Recovery task for replicated file always counts
as `1` xmit. NameNode utilizes `dfs.namenode.replication.max-streams` minus the total `xmitsInProgress` on the DataNode that combines of the xmits from
replicated file and EC files, to schedule recovery tasks to this DataNode.
### Enable Intel ISA-L ### Enable Intel ISA-L

View File

@ -30,6 +30,8 @@ import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Random; import java.util.Random;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -45,6 +47,7 @@ import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeFaultInjector;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.BlockECReconstructionCommand.BlockECReconstructionInfo; import org.apache.hadoop.hdfs.server.protocol.BlockECReconstructionCommand.BlockECReconstructionInfo;
@ -53,6 +56,7 @@ import org.apache.hadoop.io.erasurecode.CodecUtil;
import org.apache.hadoop.io.erasurecode.ErasureCodeNative; import org.apache.hadoop.io.erasurecode.ErasureCodeNative;
import org.apache.hadoop.io.erasurecode.rawcoder.NativeRSRawErasureCoderFactory; import org.apache.hadoop.io.erasurecode.rawcoder.NativeRSRawErasureCoderFactory;
import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.LambdaTestUtils;
import org.apache.log4j.Level; import org.apache.log4j.Level;
import org.junit.After; import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
@ -488,4 +492,64 @@ public class TestReconstructStripedFile {
500, 30000 500, 30000
); );
} }
@Test(timeout = 180000)
public void testErasureCodingWorkerXmitsWeight() throws Exception {
testErasureCodingWorkerXmitsWeight(1f, ecPolicy.getNumDataUnits());
testErasureCodingWorkerXmitsWeight(0f, 1);
testErasureCodingWorkerXmitsWeight(10f, 10 * ecPolicy.getNumDataUnits());
}
private void testErasureCodingWorkerXmitsWeight(
float weight, int expectedWeight)
throws Exception {
// Reset cluster with customized xmits weight.
conf.setFloat(DFSConfigKeys.DFS_DN_EC_RECONSTRUCTION_XMITS_WEIGHT_KEY,
weight);
cluster.shutdown();
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(dnNum).build();
cluster.waitActive();
fs = cluster.getFileSystem();
fs.enableErasureCodingPolicy(
StripedFileTestUtil.getDefaultECPolicy().getName());
fs.getClient().setErasureCodingPolicy("/",
StripedFileTestUtil.getDefaultECPolicy().getName());
final int fileLen = cellSize * ecPolicy.getNumDataUnits() * 2;
writeFile(fs, "/ec-xmits-weight", fileLen);
DataNode dn = cluster.getDataNodes().get(0);
int corruptBlocks = dn.getFSDataset().getFinalizedBlocks(
cluster.getNameNode().getNamesystem().getBlockPoolId()).size();
int expectedXmits = corruptBlocks * expectedWeight;
final CyclicBarrier barrier = new CyclicBarrier(corruptBlocks + 1);
DataNodeFaultInjector oldInjector = DataNodeFaultInjector.get();
DataNodeFaultInjector delayInjector = new DataNodeFaultInjector() {
public void stripedBlockReconstruction() throws IOException {
try {
barrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
throw new IOException(e);
}
}
};
DataNodeFaultInjector.set(delayInjector);
try {
shutdownDataNode(dn);
LambdaTestUtils.await(30 * 1000, 500,
() -> {
int totalXmits = cluster.getDataNodes().stream()
.mapToInt(DataNode::getXmitsInProgress).sum();
return totalXmits == expectedXmits;
}
);
} finally {
barrier.await();
DataNodeFaultInjector.set(oldInjector);
}
}
} }