diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/ElasticByteBufferPool.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/ElasticByteBufferPool.java index bbedf2a2dc3..0350db35512 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/ElasticByteBufferPool.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/ElasticByteBufferPool.java @@ -96,6 +96,7 @@ public final class ElasticByteBufferPool implements ByteBufferPool { ByteBuffer.allocate(length); } tree.remove(entry.getKey()); + entry.getValue().clear(); return entry.getValue(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java index 7e661117725..08123c17bc5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java @@ -96,6 +96,22 @@ public class DataNodeFaultInjector { */ public void stripedBlockReconstruction() throws IOException {} + /** + * Used as a hook to inject latency when read block + * in erasure coding reconstruction process. + */ + public void delayBlockReader() {} + + /** + * Used as a hook to inject intercept when free the block reader buffer. + */ + public void interceptFreeBlockReaderBuffer() {} + + /** + * Used as a hook to inject intercept When finish reading from block. + */ + public void interceptBlockReader() {} + /** * Used as a hook to inject intercept when BPOfferService hold lock. */ diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReader.java index 0db8a6f499b..ff3306b3b38 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReader.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReader.java @@ -31,6 +31,7 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.apache.hadoop.hdfs.server.datanode.DataNodeFaultInjector; import org.apache.hadoop.hdfs.util.StripedBlockUtil.BlockReadStats; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.net.NetUtils; @@ -95,6 +96,7 @@ class StripedBlockReader { } void freeReadBuffer() { + DataNodeFaultInjector.get().interceptFreeBlockReaderBuffer(); buffer = null; } @@ -179,6 +181,8 @@ class StripedBlockReader { } catch (IOException e) { LOG.info(e.getMessage()); throw e; + } finally { + DataNodeFaultInjector.get().interceptBlockReader(); } } }; @@ -188,6 +192,7 @@ class StripedBlockReader { * Perform actual reading of bytes from block. */ private BlockReadStats actualReadFromBlock() throws IOException { + DataNodeFaultInjector.get().delayBlockReader(); int len = buffer.remaining(); int n = 0; while (n < len) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReader.java index 98edf724a8e..070931ca2c5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReader.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReader.java @@ -43,6 +43,7 @@ import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.CompletionService; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; /** * Manage striped readers that performs reading of block data from remote to @@ -328,14 +329,14 @@ class StripedReader { // cancel remaining reads if we read successfully from minimum // number of source DNs required by reconstruction. cancelReads(futures.keySet()); - futures.clear(); + clearFuturesAndService(); break; } } } catch (InterruptedException e) { LOG.info("Read data interrupted.", e); cancelReads(futures.keySet()); - futures.clear(); + clearFuturesAndService(); break; } } @@ -429,6 +430,20 @@ class StripedReader { } } + // remove all stale futures from readService, and clear futures. + private void clearFuturesAndService() { + while (!futures.isEmpty()) { + try { + Future future = readService.poll( + stripedReadTimeoutInMills, TimeUnit.MILLISECONDS + ); + futures.remove(future); + } catch (InterruptedException e) { + LOG.info("Clear stale futures from service is interrupted.", e); + } + } + } + void close() { if (zeroStripeBuffers != null) { for (ByteBuffer zeroStripeBuffer : zeroStripeBuffers) { @@ -438,9 +453,9 @@ class StripedReader { zeroStripeBuffers = null; for (StripedBlockReader reader : readers) { + reader.closeBlockReader(); reconstructor.freeBuffer(reader.getReadBuffer()); reader.freeReadBuffer(); - reader.closeBlockReader(); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructor.java index 4c8be827f43..48a07471864 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructor.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hdfs.server.datanode.erasurecode; +import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; @@ -279,4 +280,9 @@ abstract class StripedReconstructor { public ErasureCodingWorker getErasureCodingWorker() { return erasureCodingWorker; } + + @VisibleForTesting + static ByteBufferPool getBufferPool() { + return BUFFER_POOL; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReconstructStripedFile.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReconstructStripedFile.java index b0b33500e69..16ce0dd58f9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReconstructStripedFile.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReconstructStripedFile.java @@ -23,6 +23,7 @@ import static org.junit.Assume.assumeTrue; import java.io.File; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; import java.util.BitSet; @@ -34,6 +35,12 @@ import java.util.Random; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.hdfs.server.datanode.erasurecode.ErasureCodingTestHelper; +import org.apache.hadoop.io.ElasticByteBufferPool; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -575,4 +582,237 @@ public class TestReconstructStripedFile { } } } + + /** + * When the StripedBlockReader timeout, the outdated future should be ignored. + * Or the NPE will be thrown, which will stop reading the remaining data, and + * the reconstruction task will fail. + */ + @Test(timeout = 120000) + public void testTimeoutReadBlockInReconstruction() throws Exception { + assumeTrue("Ignore case where num parity units <= 1", + ecPolicy.getNumParityUnits() > 1); + int stripedBufferSize = conf.getInt( + DFSConfigKeys.DFS_DN_EC_RECONSTRUCTION_STRIPED_READ_BUFFER_SIZE_KEY, + cellSize); + ErasureCodingPolicy policy = ecPolicy; + fs.enableErasureCodingPolicy(policy.getName()); + fs.getClient().setErasureCodingPolicy("/", policy.getName()); + + // StripedBlockReconstructor#reconstruct will loop 2 times + final int fileLen = stripedBufferSize * 2 * ecPolicy.getNumDataUnits(); + String fileName = "/timeout-read-block"; + Path file = new Path(fileName); + writeFile(fs, fileName, fileLen); + fs.getFileBlockLocations(file, 0, fileLen); + + LocatedBlocks locatedBlocks = + StripedFileTestUtil.getLocatedBlocks(file, fs); + Assert.assertEquals(1, locatedBlocks.getLocatedBlocks().size()); + // The file only has one block group + LocatedBlock lblock = locatedBlocks.get(0); + DatanodeInfo[] datanodeinfos = lblock.getLocations(); + + // to reconstruct first block + DataNode dataNode = cluster.getDataNode(datanodeinfos[0].getIpcPort()); + + int stripedReadTimeoutInMills = conf.getInt( + DFSConfigKeys.DFS_DN_EC_RECONSTRUCTION_STRIPED_READ_TIMEOUT_MILLIS_KEY, + DFSConfigKeys. + DFS_DN_EC_RECONSTRUCTION_STRIPED_READ_TIMEOUT_MILLIS_DEFAULT); + Assert.assertTrue( + DFSConfigKeys.DFS_DN_EC_RECONSTRUCTION_STRIPED_READ_TIMEOUT_MILLIS_KEY + + " must be greater than 2000", + stripedReadTimeoutInMills > 2000); + + DataNodeFaultInjector oldInjector = DataNodeFaultInjector.get(); + DataNodeFaultInjector timeoutInjector = new DataNodeFaultInjector() { + private AtomicInteger numDelayReader = new AtomicInteger(0); + + @Override + public void delayBlockReader() { + int index = numDelayReader.incrementAndGet(); + LOG.info("Delay the {}th read block", index); + + // the file's first StripedBlockReconstructor#reconstruct, + // and the first reader will timeout + if (index == 1) { + try { + GenericTestUtils.waitFor(() -> numDelayReader.get() >= + ecPolicy.getNumDataUnits() + 1, 50, + stripedReadTimeoutInMills * 3 + ); + } catch (TimeoutException e) { + Assert.fail("Can't reconstruct the file's first part."); + } catch (InterruptedException e) { + } + } + // stop all the following re-reconstruction tasks + if (index > 3 * ecPolicy.getNumDataUnits() + 1) { + while (true) { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + } + } + } + } + }; + DataNodeFaultInjector.set(timeoutInjector); + + try { + shutdownDataNode(dataNode); + // before HDFS-15240, NPE will cause reconstruction fail(test timeout) + StripedFileTestUtil + .waitForReconstructionFinished(file, fs, groupSize); + } finally { + DataNodeFaultInjector.set(oldInjector); + } + } + + /** + * When block reader timeout, the outdated future should be ignored. + * Or the ByteBuffer would be wrote after giving back to the BufferPool. + * This UT is used to ensure that we should close block reader + * before freeing the buffer. + */ + @Test(timeout = 120000) + public void testAbnormallyCloseDoesNotWriteBufferAgain() throws Exception { + assumeTrue("Ignore case where num parity units <= 1", + ecPolicy.getNumParityUnits() > 1); + int stripedBufferSize = conf.getInt( + DFSConfigKeys.DFS_DN_EC_RECONSTRUCTION_STRIPED_READ_BUFFER_SIZE_KEY, + cellSize); + // StripedBlockReconstructor#reconstruct will loop 2 times + final int fileLen = stripedBufferSize * 2 * ecPolicy.getNumDataUnits(); + String fileName = "/no-dirty-buffer"; + Path file = new Path(fileName); + writeFile(fs, fileName, fileLen); + fs.getFileBlockLocations(file, 0, fileLen); + + LocatedBlocks locatedBlocks = + StripedFileTestUtil.getLocatedBlocks(file, fs); + Assert.assertEquals(1, locatedBlocks.getLocatedBlocks().size()); + // The file only has one block group + LocatedBlock lblock = locatedBlocks.get(0); + DatanodeInfo[] datanodeinfos = lblock.getLocations(); + + // to reconstruct first block + DataNode dataNode = cluster.getDataNode(datanodeinfos[0].getIpcPort()); + + int stripedReadTimeoutInMills = conf.getInt( + DFSConfigKeys.DFS_DN_EC_RECONSTRUCTION_STRIPED_READ_TIMEOUT_MILLIS_KEY, + DFSConfigKeys. + DFS_DN_EC_RECONSTRUCTION_STRIPED_READ_TIMEOUT_MILLIS_DEFAULT); + Assert.assertTrue( + DFSConfigKeys.DFS_DN_EC_RECONSTRUCTION_STRIPED_READ_TIMEOUT_MILLIS_KEY + + " must be greater than 2000", + stripedReadTimeoutInMills > 2000); + + ElasticByteBufferPool bufferPool = + (ElasticByteBufferPool) ErasureCodingTestHelper.getBufferPool(); + emptyBufferPool(bufferPool, true); + emptyBufferPool(bufferPool, false); + + AtomicInteger finishedReadBlock = new AtomicInteger(0); + + DataNodeFaultInjector oldInjector = DataNodeFaultInjector.get(); + DataNodeFaultInjector timeoutInjector = new DataNodeFaultInjector() { + private AtomicInteger numDelayReader = new AtomicInteger(0); + private AtomicBoolean continueRead = new AtomicBoolean(false); + private AtomicBoolean closeByNPE = new AtomicBoolean(false); + + @Override + public void delayBlockReader() { + int index = numDelayReader.incrementAndGet(); + LOG.info("Delay the {}th read block", index); + + // the file's first StripedBlockReconstructor#reconstruct, + // and the first reader will timeout + if (index == 1) { + try { + GenericTestUtils.waitFor(() -> numDelayReader.get() >= + ecPolicy.getNumDataUnits() + 1, 50, + stripedReadTimeoutInMills * 3 + ); + } catch (TimeoutException e) { + Assert.fail("Can't reconstruct the file's first part."); + } catch (InterruptedException e) { + } + } + if (index > ecPolicy.getNumDataUnits() + 1) { + try { + GenericTestUtils.waitFor( + () -> { + LOG.info("Close by NPE: {}, continue read: {}", + closeByNPE, continueRead); + return closeByNPE.get() ? continueRead.get() + : index == finishedReadBlock.get() + 1; }, 5, + stripedReadTimeoutInMills * 3 + ); + } catch (TimeoutException e) { + Assert.fail("Can't reconstruct the file's remaining part."); + } catch (InterruptedException e) { + } + } + } + + @Override + public void interceptBlockReader() { + int n = finishedReadBlock.incrementAndGet(); + LOG.info("Intercept the end of {}th read block.", n); + } + + private AtomicInteger numFreeBuffer = new AtomicInteger(0); + @Override + public void interceptFreeBlockReaderBuffer() { + closeByNPE.compareAndSet(false, true); + int num = numFreeBuffer.incrementAndGet(); + LOG.info("Intercept the {} free block buffer.", num); + if (num >= ecPolicy.getNumDataUnits() + 1) { + continueRead.compareAndSet(false, true); + try { + GenericTestUtils.waitFor(() -> finishedReadBlock.get() >= + 2 * ecPolicy.getNumDataUnits() + 1, 50, + stripedReadTimeoutInMills * 3 + ); + } catch (TimeoutException e) { + Assert.fail("Can't finish the file's reconstruction."); + } catch (InterruptedException e) { + } + } + } + }; + DataNodeFaultInjector.set(timeoutInjector); + try { + shutdownDataNode(dataNode); + // at least one timeout reader + GenericTestUtils.waitFor(() -> finishedReadBlock.get() >= + 2 * ecPolicy.getNumDataUnits() + 1, 50, + stripedReadTimeoutInMills * 3 + ); + + assertBufferPoolIsEmpty(bufferPool, false); + assertBufferPoolIsEmpty(bufferPool, true); + StripedFileTestUtil.waitForReconstructionFinished(file, fs, groupSize); + } finally { + DataNodeFaultInjector.set(oldInjector); + } + } + + private void assertBufferPoolIsEmpty(ElasticByteBufferPool bufferPool, + boolean direct) { + while (bufferPool.size(direct) != 0) { + // iterate all ByteBuffers in ElasticByteBufferPool + ByteBuffer byteBuffer = bufferPool.getBuffer(direct, 0); + Assert.assertEquals(0, byteBuffer.position()); + } + } + + private void emptyBufferPool(ElasticByteBufferPool bufferPool, + boolean direct) { + while (bufferPool.size(direct) != 0) { + bufferPool.getBuffer(direct, 0); + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingTestHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingTestHelper.java new file mode 100644 index 00000000000..da571055109 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingTestHelper.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.datanode.erasurecode; + +import org.apache.hadoop.io.ByteBufferPool; + +public final class ErasureCodingTestHelper { + + private ErasureCodingTestHelper() { } + + public static ByteBufferPool getBufferPool() { + return StripedReconstructor.getBufferPool(); + } +}