HDFS-15240. Erasure Coding: dirty buffer causes reconstruction block error. Contributed by HuangTao.

This commit is contained in:
Hui Fei 2020-12-04 09:20:09 +08:00 committed by He Xiaoqiao
parent af437f1722
commit 6e215953e6
No known key found for this signature in database
GPG Key ID: A80CC124E9A0FA63
7 changed files with 316 additions and 3 deletions

View File

@ -96,6 +96,7 @@ public final class ElasticByteBufferPool implements ByteBufferPool {
ByteBuffer.allocate(length); ByteBuffer.allocate(length);
} }
tree.remove(entry.getKey()); tree.remove(entry.getKey());
entry.getValue().clear();
return entry.getValue(); return entry.getValue();
} }

View File

@ -96,6 +96,22 @@ public class DataNodeFaultInjector {
*/ */
public void stripedBlockReconstruction() throws IOException {} public void stripedBlockReconstruction() throws IOException {}
/**
* Used as a hook to inject latency when read block
* in erasure coding reconstruction process.
*/
public void delayBlockReader() {}
/**
* Used as a hook to inject intercept when free the block reader buffer.
*/
public void interceptFreeBlockReaderBuffer() {}
/**
* Used as a hook to inject intercept When finish reading from block.
*/
public void interceptBlockReader() {}
/** /**
* Used as a hook to inject intercept when BPOfferService hold lock. * Used as a hook to inject intercept when BPOfferService hold lock.
*/ */

View File

@ -31,6 +31,7 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeFaultInjector;
import org.apache.hadoop.hdfs.util.StripedBlockUtil.BlockReadStats; import org.apache.hadoop.hdfs.util.StripedBlockUtil.BlockReadStats;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetUtils;
@ -95,6 +96,7 @@ class StripedBlockReader {
} }
void freeReadBuffer() { void freeReadBuffer() {
DataNodeFaultInjector.get().interceptFreeBlockReaderBuffer();
buffer = null; buffer = null;
} }
@ -179,6 +181,8 @@ class StripedBlockReader {
} catch (IOException e) { } catch (IOException e) {
LOG.info(e.getMessage()); LOG.info(e.getMessage());
throw e; throw e;
} finally {
DataNodeFaultInjector.get().interceptBlockReader();
} }
} }
}; };
@ -188,6 +192,7 @@ class StripedBlockReader {
* Perform actual reading of bytes from block. * Perform actual reading of bytes from block.
*/ */
private BlockReadStats actualReadFromBlock() throws IOException { private BlockReadStats actualReadFromBlock() throws IOException {
DataNodeFaultInjector.get().delayBlockReader();
int len = buffer.remaining(); int len = buffer.remaining();
int n = 0; int n = 0;
while (n < len) { while (n < len) {

View File

@ -43,6 +43,7 @@ import java.util.Map;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService; import java.util.concurrent.CompletionService;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
/** /**
* Manage striped readers that performs reading of block data from remote to * Manage striped readers that performs reading of block data from remote to
@ -328,14 +329,14 @@ class StripedReader {
// cancel remaining reads if we read successfully from minimum // cancel remaining reads if we read successfully from minimum
// number of source DNs required by reconstruction. // number of source DNs required by reconstruction.
cancelReads(futures.keySet()); cancelReads(futures.keySet());
futures.clear(); clearFuturesAndService();
break; break;
} }
} }
} catch (InterruptedException e) { } catch (InterruptedException e) {
LOG.info("Read data interrupted.", e); LOG.info("Read data interrupted.", e);
cancelReads(futures.keySet()); cancelReads(futures.keySet());
futures.clear(); clearFuturesAndService();
break; break;
} }
} }
@ -429,6 +430,20 @@ class StripedReader {
} }
} }
// remove all stale futures from readService, and clear futures.
private void clearFuturesAndService() {
while (!futures.isEmpty()) {
try {
Future<BlockReadStats> future = readService.poll(
stripedReadTimeoutInMills, TimeUnit.MILLISECONDS
);
futures.remove(future);
} catch (InterruptedException e) {
LOG.info("Clear stale futures from service is interrupted.", e);
}
}
}
void close() { void close() {
if (zeroStripeBuffers != null) { if (zeroStripeBuffers != null) {
for (ByteBuffer zeroStripeBuffer : zeroStripeBuffers) { for (ByteBuffer zeroStripeBuffer : zeroStripeBuffers) {
@ -438,9 +453,9 @@ class StripedReader {
zeroStripeBuffers = null; zeroStripeBuffers = null;
for (StripedBlockReader reader : readers) { for (StripedBlockReader reader : readers) {
reader.closeBlockReader();
reconstructor.freeBuffer(reader.getReadBuffer()); reconstructor.freeBuffer(reader.getReadBuffer());
reader.freeReadBuffer(); reader.freeReadBuffer();
reader.closeBlockReader();
} }
} }

View File

@ -17,6 +17,7 @@
*/ */
package org.apache.hadoop.hdfs.server.datanode.erasurecode; package org.apache.hadoop.hdfs.server.datanode.erasurecode;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
@ -279,4 +280,9 @@ abstract class StripedReconstructor {
public ErasureCodingWorker getErasureCodingWorker() { public ErasureCodingWorker getErasureCodingWorker() {
return erasureCodingWorker; return erasureCodingWorker;
} }
@VisibleForTesting
static ByteBufferPool getBufferPool() {
return BUFFER_POOL;
}
} }

View File

@ -23,6 +23,7 @@ import static org.junit.Assume.assumeTrue;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.BitSet; import java.util.BitSet;
@ -34,6 +35,12 @@ import java.util.Random;
import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier; import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.server.datanode.erasurecode.ErasureCodingTestHelper;
import org.apache.hadoop.io.ElasticByteBufferPool;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
@ -575,4 +582,237 @@ public class TestReconstructStripedFile {
} }
} }
} }
/**
* When the StripedBlockReader timeout, the outdated future should be ignored.
* Or the NPE will be thrown, which will stop reading the remaining data, and
* the reconstruction task will fail.
*/
@Test(timeout = 120000)
public void testTimeoutReadBlockInReconstruction() throws Exception {
assumeTrue("Ignore case where num parity units <= 1",
ecPolicy.getNumParityUnits() > 1);
int stripedBufferSize = conf.getInt(
DFSConfigKeys.DFS_DN_EC_RECONSTRUCTION_STRIPED_READ_BUFFER_SIZE_KEY,
cellSize);
ErasureCodingPolicy policy = ecPolicy;
fs.enableErasureCodingPolicy(policy.getName());
fs.getClient().setErasureCodingPolicy("/", policy.getName());
// StripedBlockReconstructor#reconstruct will loop 2 times
final int fileLen = stripedBufferSize * 2 * ecPolicy.getNumDataUnits();
String fileName = "/timeout-read-block";
Path file = new Path(fileName);
writeFile(fs, fileName, fileLen);
fs.getFileBlockLocations(file, 0, fileLen);
LocatedBlocks locatedBlocks =
StripedFileTestUtil.getLocatedBlocks(file, fs);
Assert.assertEquals(1, locatedBlocks.getLocatedBlocks().size());
// The file only has one block group
LocatedBlock lblock = locatedBlocks.get(0);
DatanodeInfo[] datanodeinfos = lblock.getLocations();
// to reconstruct first block
DataNode dataNode = cluster.getDataNode(datanodeinfos[0].getIpcPort());
int stripedReadTimeoutInMills = conf.getInt(
DFSConfigKeys.DFS_DN_EC_RECONSTRUCTION_STRIPED_READ_TIMEOUT_MILLIS_KEY,
DFSConfigKeys.
DFS_DN_EC_RECONSTRUCTION_STRIPED_READ_TIMEOUT_MILLIS_DEFAULT);
Assert.assertTrue(
DFSConfigKeys.DFS_DN_EC_RECONSTRUCTION_STRIPED_READ_TIMEOUT_MILLIS_KEY
+ " must be greater than 2000",
stripedReadTimeoutInMills > 2000);
DataNodeFaultInjector oldInjector = DataNodeFaultInjector.get();
DataNodeFaultInjector timeoutInjector = new DataNodeFaultInjector() {
private AtomicInteger numDelayReader = new AtomicInteger(0);
@Override
public void delayBlockReader() {
int index = numDelayReader.incrementAndGet();
LOG.info("Delay the {}th read block", index);
// the file's first StripedBlockReconstructor#reconstruct,
// and the first reader will timeout
if (index == 1) {
try {
GenericTestUtils.waitFor(() -> numDelayReader.get() >=
ecPolicy.getNumDataUnits() + 1, 50,
stripedReadTimeoutInMills * 3
);
} catch (TimeoutException e) {
Assert.fail("Can't reconstruct the file's first part.");
} catch (InterruptedException e) {
}
}
// stop all the following re-reconstruction tasks
if (index > 3 * ecPolicy.getNumDataUnits() + 1) {
while (true) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
}
}
}
};
DataNodeFaultInjector.set(timeoutInjector);
try {
shutdownDataNode(dataNode);
// before HDFS-15240, NPE will cause reconstruction fail(test timeout)
StripedFileTestUtil
.waitForReconstructionFinished(file, fs, groupSize);
} finally {
DataNodeFaultInjector.set(oldInjector);
}
}
/**
* When block reader timeout, the outdated future should be ignored.
* Or the ByteBuffer would be wrote after giving back to the BufferPool.
* This UT is used to ensure that we should close block reader
* before freeing the buffer.
*/
@Test(timeout = 120000)
public void testAbnormallyCloseDoesNotWriteBufferAgain() throws Exception {
assumeTrue("Ignore case where num parity units <= 1",
ecPolicy.getNumParityUnits() > 1);
int stripedBufferSize = conf.getInt(
DFSConfigKeys.DFS_DN_EC_RECONSTRUCTION_STRIPED_READ_BUFFER_SIZE_KEY,
cellSize);
// StripedBlockReconstructor#reconstruct will loop 2 times
final int fileLen = stripedBufferSize * 2 * ecPolicy.getNumDataUnits();
String fileName = "/no-dirty-buffer";
Path file = new Path(fileName);
writeFile(fs, fileName, fileLen);
fs.getFileBlockLocations(file, 0, fileLen);
LocatedBlocks locatedBlocks =
StripedFileTestUtil.getLocatedBlocks(file, fs);
Assert.assertEquals(1, locatedBlocks.getLocatedBlocks().size());
// The file only has one block group
LocatedBlock lblock = locatedBlocks.get(0);
DatanodeInfo[] datanodeinfos = lblock.getLocations();
// to reconstruct first block
DataNode dataNode = cluster.getDataNode(datanodeinfos[0].getIpcPort());
int stripedReadTimeoutInMills = conf.getInt(
DFSConfigKeys.DFS_DN_EC_RECONSTRUCTION_STRIPED_READ_TIMEOUT_MILLIS_KEY,
DFSConfigKeys.
DFS_DN_EC_RECONSTRUCTION_STRIPED_READ_TIMEOUT_MILLIS_DEFAULT);
Assert.assertTrue(
DFSConfigKeys.DFS_DN_EC_RECONSTRUCTION_STRIPED_READ_TIMEOUT_MILLIS_KEY
+ " must be greater than 2000",
stripedReadTimeoutInMills > 2000);
ElasticByteBufferPool bufferPool =
(ElasticByteBufferPool) ErasureCodingTestHelper.getBufferPool();
emptyBufferPool(bufferPool, true);
emptyBufferPool(bufferPool, false);
AtomicInteger finishedReadBlock = new AtomicInteger(0);
DataNodeFaultInjector oldInjector = DataNodeFaultInjector.get();
DataNodeFaultInjector timeoutInjector = new DataNodeFaultInjector() {
private AtomicInteger numDelayReader = new AtomicInteger(0);
private AtomicBoolean continueRead = new AtomicBoolean(false);
private AtomicBoolean closeByNPE = new AtomicBoolean(false);
@Override
public void delayBlockReader() {
int index = numDelayReader.incrementAndGet();
LOG.info("Delay the {}th read block", index);
// the file's first StripedBlockReconstructor#reconstruct,
// and the first reader will timeout
if (index == 1) {
try {
GenericTestUtils.waitFor(() -> numDelayReader.get() >=
ecPolicy.getNumDataUnits() + 1, 50,
stripedReadTimeoutInMills * 3
);
} catch (TimeoutException e) {
Assert.fail("Can't reconstruct the file's first part.");
} catch (InterruptedException e) {
}
}
if (index > ecPolicy.getNumDataUnits() + 1) {
try {
GenericTestUtils.waitFor(
() -> {
LOG.info("Close by NPE: {}, continue read: {}",
closeByNPE, continueRead);
return closeByNPE.get() ? continueRead.get()
: index == finishedReadBlock.get() + 1; }, 5,
stripedReadTimeoutInMills * 3
);
} catch (TimeoutException e) {
Assert.fail("Can't reconstruct the file's remaining part.");
} catch (InterruptedException e) {
}
}
}
@Override
public void interceptBlockReader() {
int n = finishedReadBlock.incrementAndGet();
LOG.info("Intercept the end of {}th read block.", n);
}
private AtomicInteger numFreeBuffer = new AtomicInteger(0);
@Override
public void interceptFreeBlockReaderBuffer() {
closeByNPE.compareAndSet(false, true);
int num = numFreeBuffer.incrementAndGet();
LOG.info("Intercept the {} free block buffer.", num);
if (num >= ecPolicy.getNumDataUnits() + 1) {
continueRead.compareAndSet(false, true);
try {
GenericTestUtils.waitFor(() -> finishedReadBlock.get() >=
2 * ecPolicy.getNumDataUnits() + 1, 50,
stripedReadTimeoutInMills * 3
);
} catch (TimeoutException e) {
Assert.fail("Can't finish the file's reconstruction.");
} catch (InterruptedException e) {
}
}
}
};
DataNodeFaultInjector.set(timeoutInjector);
try {
shutdownDataNode(dataNode);
// at least one timeout reader
GenericTestUtils.waitFor(() -> finishedReadBlock.get() >=
2 * ecPolicy.getNumDataUnits() + 1, 50,
stripedReadTimeoutInMills * 3
);
assertBufferPoolIsEmpty(bufferPool, false);
assertBufferPoolIsEmpty(bufferPool, true);
StripedFileTestUtil.waitForReconstructionFinished(file, fs, groupSize);
} finally {
DataNodeFaultInjector.set(oldInjector);
}
}
private void assertBufferPoolIsEmpty(ElasticByteBufferPool bufferPool,
boolean direct) {
while (bufferPool.size(direct) != 0) {
// iterate all ByteBuffers in ElasticByteBufferPool
ByteBuffer byteBuffer = bufferPool.getBuffer(direct, 0);
Assert.assertEquals(0, byteBuffer.position());
}
}
private void emptyBufferPool(ElasticByteBufferPool bufferPool,
boolean direct) {
while (bufferPool.size(direct) != 0) {
bufferPool.getBuffer(direct, 0);
}
}
} }

View File

@ -0,0 +1,30 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.datanode.erasurecode;
import org.apache.hadoop.io.ByteBufferPool;
public final class ErasureCodingTestHelper {
private ErasureCodingTestHelper() { }
public static ByteBufferPool getBufferPool() {
return StripedReconstructor.getBufferPool();
}
}