diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java index 2e770cc1c73..e7cd0d827ee 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java @@ -83,6 +83,7 @@ import java.util.List; import java.util.Locale; import java.util.Map; import java.util.Set; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -811,10 +812,30 @@ public class DFSUtilClient { public static ThreadPoolExecutor getThreadPoolExecutor(int corePoolSize, int maxPoolSize, long keepAliveTimeSecs, String threadNamePrefix, boolean runRejectedExec) { + return getThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveTimeSecs, + new SynchronousQueue<>(), threadNamePrefix, runRejectedExec); +} + + /** + * Utility to create a {@link ThreadPoolExecutor}. + * + * @param corePoolSize - min threads in the pool, even if idle + * @param maxPoolSize - max threads in the pool + * @param keepAliveTimeSecs - max seconds beyond which excess idle threads + * will be terminated + * @param queue - the queue to use for holding tasks before they are executed. + * @param threadNamePrefix - name prefix for the pool threads + * @param runRejectedExec - when true, rejected tasks from + * ThreadPoolExecutor are run in the context of calling thread + * @return ThreadPoolExecutor + */ + public static ThreadPoolExecutor getThreadPoolExecutor(int corePoolSize, + int maxPoolSize, long keepAliveTimeSecs, BlockingQueue queue, + String threadNamePrefix, boolean runRejectedExec) { Preconditions.checkArgument(corePoolSize > 0); ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveTimeSecs, TimeUnit.SECONDS, - new SynchronousQueue(), new Daemon.DaemonFactory() { + queue, new Daemon.DaemonFactory() { private final AtomicInteger threadIndex = new AtomicInteger(0); @Override diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index 27303937c91..60694876331 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -2203,6 +2203,17 @@ public class DataNode extends ReconfigurableBase xmitsInProgress.getAndIncrement(); } + /** + * Increments the xmitInProgress count by given value. + * + * @param delta the amount of xmitsInProgress to increase. + * @see #incrementXmitsInProgress() + */ + public void incrementXmitsInProcess(int delta) { + Preconditions.checkArgument(delta >= 0); + xmitsInProgress.getAndAdd(delta); + } + /** * Decrements the xmitsInProgress count */ @@ -2210,6 +2221,16 @@ public class DataNode extends ReconfigurableBase xmitsInProgress.getAndDecrement(); } + /** + * Decrements the xmitsInProgress count by given value. + * + * @see #decrementXmitsInProgress() + */ + public void decrementXmitsInProgress(int delta) { + Preconditions.checkArgument(delta >= 0); + xmitsInProgress.getAndAdd(0 - delta); + } + private void reportBadBlock(final BPOfferService bpos, final ExtendedBlock block, final String msg) { FsVolumeSpi volume = getFSDataset().getVolume(block); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java index e076dda9809..72c224f2f77 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java @@ -27,6 +27,7 @@ import org.apache.hadoop.util.Daemon; import org.slf4j.Logger; import java.util.Collection; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -93,7 +94,8 @@ public final class ErasureCodingWorker { LOG.debug("Using striped block reconstruction; pool threads={}", numThreads); stripedReconstructionPool = DFSUtilClient.getThreadPoolExecutor(2, - numThreads, 60, "StripedBlockReconstruction-", false); + numThreads, 60, new LinkedBlockingQueue<>(), + "StripedBlockReconstruction-", false); stripedReconstructionPool.allowCoreThreadTimeOut(true); } @@ -106,6 +108,7 @@ public final class ErasureCodingWorker { public void processErasureCodingTasks( Collection ecTasks) { for (BlockECReconstructionInfo reconInfo : ecTasks) { + int xmitsSubmitted = 0; try { StripedReconstructionInfo stripedReconInfo = new StripedReconstructionInfo( @@ -113,15 +116,25 @@ public final class ErasureCodingWorker { reconInfo.getLiveBlockIndices(), reconInfo.getSourceDnInfos(), reconInfo.getTargetDnInfos(), reconInfo.getTargetStorageTypes(), reconInfo.getTargetStorageIDs()); + // It may throw IllegalArgumentException from task#stripedReader + // constructor. final StripedBlockReconstructor task = new StripedBlockReconstructor(this, stripedReconInfo); if (task.hasValidTargets()) { + // See HDFS-12044. We increase xmitsInProgress even the task is only + // enqueued, so that + // 1) NN will not send more tasks than what DN can execute and + // 2) DN will not throw away reconstruction tasks, and instead keeps + // an unbounded number of tasks in the executor's task queue. + xmitsSubmitted = task.getXmits(); + getDatanode().incrementXmitsInProcess(xmitsSubmitted); stripedReconstructionPool.submit(task); } else { LOG.warn("No missing internal block. Skip reconstruction for task:{}", reconInfo); } } catch (Throwable e) { + getDatanode().decrementXmitsInProgress(xmitsSubmitted); LOG.warn("Failed to reconstruct striped block {}", reconInfo.getExtendedBlock().getLocalBlock(), e); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReconstructor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReconstructor.java index 1119bbbd230..bac013aea29 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReconstructor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReconstructor.java @@ -48,7 +48,6 @@ class StripedBlockReconstructor extends StripedReconstructor @Override public void run() { - getDatanode().incrementXmitsInProgress(); try { initDecoderIfNecessary(); @@ -66,7 +65,7 @@ class StripedBlockReconstructor extends StripedReconstructor LOG.warn("Failed to reconstruct striped block: {}", getBlockGroup(), e); getDatanode().getMetrics().incrECFailedReconstructionTasks(); } finally { - getDatanode().decrementXmitsInProgress(); + getDatanode().decrementXmitsInProgress(getXmits()); final DataNodeMetrics metrics = getDatanode().getMetrics(); metrics.incrECReconstructionTasks(); metrics.incrECReconstructionBytesRead(getBytesRead()); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReader.java index f6f343a6bf3..96f97915455 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReader.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReader.java @@ -68,6 +68,8 @@ class StripedReader { private int[] successList; private final int minRequiredSources; + // the number of xmits used by the re-construction task. + private final int xmits; // The buffers and indices for striped blocks whose length is 0 private ByteBuffer[] zeroStripeBuffers; private short[] zeroStripeIndices; @@ -107,6 +109,12 @@ class StripedReader { zeroStripeIndices = new short[zeroStripNum]; } + // It is calculated by the maximum number of connections from either sources + // or targets. + xmits = Math.max(minRequiredSources, + stripedReconInfo.getTargets() != null ? + stripedReconInfo.getTargets().length : 0); + this.liveIndices = stripedReconInfo.getLiveIndices(); assert liveIndices != null; this.sources = stripedReconInfo.getSources(); @@ -472,4 +480,16 @@ class StripedReader { CachingStrategy getCachingStrategy() { return reconstructor.getCachingStrategy(); } + + /** + * Return the xmits of this EC reconstruction task. + *

+ * DN uses it to coordinate with NN to adjust the speed of scheduling the + * EC reconstruction tasks to this DN. + * + * @return the xmits of this reconstruction task. + */ + int getXmits() { + return xmits; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructionInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructionInfo.java index a619c34781c..0a3e12546df 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructionInfo.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructionInfo.java @@ -103,5 +103,20 @@ public class StripedReconstructionInfo { String[] getTargetStorageIds() { return targetStorageIds; } + + /** + * Return the weight of this EC reconstruction task. + * + * DN uses it to coordinate with NN to adjust the speed of scheduling the + * reconstructions tasks to this DN. + * + * @return the weight of this reconstruction task. + * @see HDFS-12044 + */ + int getWeight() { + // See HDFS-12044. The weight of a RS(n, k) is calculated by the network + // connections it opens. + return sources.length + targets.length; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructor.java index b8433c7b6c3..3202121b62e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructor.java @@ -133,7 +133,6 @@ abstract class StripedReconstructor { } blockGroup = stripedReconInfo.getBlockGroup(); stripedReader = new StripedReader(this, datanode, conf, stripedReconInfo); - cachingStrategy = CachingStrategy.newDefaultStrategy(); positionInBlock = 0L; @@ -233,6 +232,13 @@ abstract class StripedReconstructor { return blockGroup; } + /** + * Get the xmits that _will_ be used for this reconstruction task. + */ + int getXmits() { + return stripedReader.getXmits(); + } + BitSet getLiveBitSet() { return liveBitSet; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReconstructStripedFile.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReconstructStripedFile.java index 29e4028f559..7cd34c2acd7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReconstructStripedFile.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReconstructStripedFile.java @@ -25,6 +25,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.BitSet; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -44,6 +45,7 @@ import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo; import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; import org.apache.hadoop.hdfs.server.protocol.BlockECReconstructionCommand.BlockECReconstructionInfo; import org.apache.hadoop.hdfs.util.StripedBlockUtil; @@ -81,6 +83,7 @@ public class TestReconstructStripedFile { Any } + private Configuration conf; private MiniDFSCluster cluster; private DistributedFileSystem fs; // Map: DatanodeID -> datanode index in cluster @@ -89,7 +92,7 @@ public class TestReconstructStripedFile { @Before public void setup() throws IOException { - final Configuration conf = new Configuration(); + conf = new Configuration(); conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize); conf.setInt( DFSConfigKeys.DFS_DN_EC_RECONSTRUCTION_STRIPED_READ_BUFFER_SIZE_KEY, @@ -263,6 +266,14 @@ public class TestReconstructStripedFile { return stoppedDNs; } + private static void writeFile(DistributedFileSystem fs, String fileName, + int fileLen) throws Exception { + final byte[] data = new byte[fileLen]; + Arrays.fill(data, (byte) 1); + DFSTestUtil.writeFile(fs, new Path(fileName), data); + StripedFileTestUtil.waitBlockGroupsReported(fs, fileName); + } + /** * Test the file blocks reconstruction. * 1. Check the replica is reconstructed in the target datanode, @@ -278,10 +289,7 @@ public class TestReconstructStripedFile { Path file = new Path(fileName); - final byte[] data = new byte[fileLen]; - Arrays.fill(data, (byte) 1); - DFSTestUtil.writeFile(fs, file, data); - StripedFileTestUtil.waitBlockGroupsReported(fs, fileName); + writeFile(fs, fileName, fileLen); LocatedBlocks locatedBlocks = StripedFileTestUtil.getLocatedBlocks(file, fs); @@ -424,4 +432,60 @@ public class TestReconstructStripedFile { ecTasks.add(invalidECInfo); dataNode.getErasureCodingWorker().processErasureCodingTasks(ecTasks); } + + // HDFS-12044 + @Test(timeout = 60000) + public void testNNSendsErasureCodingTasks() throws Exception { + testNNSendsErasureCodingTasks(1); + testNNSendsErasureCodingTasks(2); + } + + private void testNNSendsErasureCodingTasks(int deadDN) throws Exception { + cluster.shutdown(); + + final int numDataNodes = dnNum + 1; + conf.setInt( + DFSConfigKeys.DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_KEY, 10); + conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY, 20); + conf.setInt(DFSConfigKeys.DFS_DN_EC_RECONSTRUCTION_STRIPED_BLK_THREADS_KEY, + 2); + cluster = new MiniDFSCluster.Builder(conf) + .numDataNodes(numDataNodes).build(); + cluster.waitActive(); + fs = cluster.getFileSystem(); + ErasureCodingPolicy policy = StripedFileTestUtil.getDefaultECPolicy(); + fs.getClient().setErasureCodingPolicy("/", policy.getName()); + + final int fileLen = cellSize * ecPolicy.getNumDataUnits() * 2; + for (int i = 0; i < 100; i++) { + writeFile(fs, "/ec-file-" + i, fileLen); + } + + // Inject data-loss by tear down desired number of DataNodes. + assertTrue(policy.getNumParityUnits() >= deadDN); + List dataNodes = new ArrayList<>(cluster.getDataNodes()); + Collections.shuffle(dataNodes); + for (DataNode dn : dataNodes.subList(0, deadDN)) { + shutdownDataNode(dn); + } + + final FSNamesystem ns = cluster.getNamesystem(); + GenericTestUtils.waitFor(() -> ns.getPendingDeletionBlocks() == 0, + 500, 30000); + + // Make sure that all pending reconstruction tasks can be processed. + while (ns.getPendingReconstructionBlocks() > 0) { + long timeoutPending = ns.getNumTimedOutPendingReconstructions(); + assertTrue(String.format("Found %d timeout pending reconstruction tasks", + timeoutPending), timeoutPending == 0); + Thread.sleep(1000); + } + + // Verify all DN reaches zero xmitsInProgress. + GenericTestUtils.waitFor(() -> + cluster.getDataNodes().stream().mapToInt( + DataNode::getXmitsInProgress).sum() == 0, + 500, 30000 + ); + } }