HDFS-12453. TestDataNodeHotSwapVolumes fails in trunk Jenkins runs. Contributed by Jim Brennan and Lei Xu.

This commit is contained in:
Kihwal Lee 2020-06-12 16:19:36 -05:00
parent a11c606910
commit 17109758dd
1 changed files with 97 additions and 52 deletions

View File

@ -29,6 +29,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.BlockMissingException;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
@ -36,6 +37,8 @@ import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
@ -46,6 +49,7 @@ import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
import org.apache.hadoop.io.MultipleIOException;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.Time;
import org.junit.After;
@ -83,6 +87,7 @@ import static org.hamcrest.CoreMatchers.not;
import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@ -777,12 +782,11 @@ public class TestDataNodeHotSwapVolumes {
private void testRemoveVolumeBeingWrittenForDatanode(int dataNodeIdx)
throws IOException, ReconfigurationException, TimeoutException,
InterruptedException, BrokenBarrierException {
// Starts DFS cluster with 3 DataNodes to form a pipeline.
startDFSCluster(1, 3);
startDFSCluster(1, 4);
final short REPLICATION = 3;
final DataNode dn = cluster.getDataNodes().get(dataNodeIdx);
final FileSystem fs = cluster.getFileSystem();
final DistributedFileSystem fs = cluster.getFileSystem();
final DFSClient client = fs.getClient();
final Path testFile = new Path("/test");
FSDataOutputStream out = fs.create(testFile, REPLICATION);
@ -792,54 +796,102 @@ public class TestDataNodeHotSwapVolumes {
out.write(writeBuf);
out.hflush();
// Make FsDatasetSpi#finalizeBlock a time-consuming operation. So if the
// BlockReceiver releases volume reference before finalizeBlock(), the blocks
// on the volume will be removed, and finalizeBlock() throws IOE.
final FsDatasetSpi<? extends FsVolumeSpi> data = dn.data;
dn.data = Mockito.spy(data);
doAnswer(new Answer<Object>() {
public Object answer(InvocationOnMock invocation)
throws IOException, InterruptedException {
Thread.sleep(1000);
// Bypass the argument to FsDatasetImpl#finalizeBlock to verify that
// the block is not removed, since the volume reference should not
// be released at this point.
data.finalizeBlock((ExtendedBlock) invocation.getArguments()[0],
(boolean) invocation.getArguments()[1]);
return null;
}
}).when(dn.data).finalizeBlock(any(ExtendedBlock.class),
Mockito.anyBoolean());
BlockLocation[] blocks = fs.getFileBlockLocations(testFile, 0, BLOCK_SIZE);
String[] dataNodeNames = blocks[0].getNames();
String dataNodeName = dataNodeNames[dataNodeIdx];
int xferPort = Integer.parseInt(dataNodeName.split(":")[1]);
DataNode dn = null;
for (DataNode dataNode : cluster.getDataNodes()) {
if (dataNode.getXferPort() == xferPort) {
dn = dataNode;
break;
}
}
assertNotNull(dn);
final CyclicBarrier barrier = new CyclicBarrier(2);
List<String> oldDirs = getDataDirs(dn);
final String newDirs = oldDirs.get(1); // Remove the first volume.
final List<Exception> exceptions = new ArrayList<>();
Thread reconfigThread = new Thread() {
public void run() {
final CyclicBarrier barrier = new CyclicBarrier(4);
final AtomicBoolean done = new AtomicBoolean(false);
DataNodeFaultInjector newInjector = new DataNodeFaultInjector() {
public void logDelaySendingAckToUpstream(
final String upstreamAddr, final long delayMs) throws IOException {
try {
barrier.await();
assertThat(
"DN did not update its own config",
dn.reconfigurePropertyImpl(DFS_DATANODE_DATA_DIR_KEY, newDirs),
is(dn.getConf().get(DFS_DATANODE_DATA_DIR_KEY)));
} catch (ReconfigurationException |
InterruptedException |
BrokenBarrierException e) {
exceptions.add(e);
// Make all streams which hold the volume references to wait the
// reconfiguration thread to start.
// It should only block IO during the period of reconfiguration
// task running.
if (!done.get()) {
barrier.await();
// Add delays to allow the reconfiguration thread starts before
// IO finish.
Thread.sleep(1000);
}
} catch (InterruptedException | BrokenBarrierException e) {
throw new IOException(e);
}
}
};
reconfigThread.start();
DataNodeFaultInjector oldInjector = DataNodeFaultInjector.get();
barrier.await();
rb.nextBytes(writeBuf);
out.write(writeBuf);
out.hflush();
out.close();
try {
DataNodeFaultInjector.set(newInjector);
reconfigThread.join();
List<String> oldDirs = getDataDirs(dn);
LocatedBlocks lbs = client.getLocatedBlocks("/test", 0);
LocatedBlock block = lbs.get(0);
FsVolumeImpl volume =
(FsVolumeImpl) dn.getFSDataset().getVolume(block.getBlock());
StringBuffer newDirsBuf = new StringBuffer();
String delim = "";
for (String d : oldDirs) {
if (! d.contains(volume.getBasePath())) {
newDirsBuf.append(delim).append(d);
delim = ",";
}
}
final String newDirs = newDirsBuf.toString();
final List<IOException> exceptions = new ArrayList<>();
final DataNode dataNode = dn;
final CyclicBarrier reconfigBarrier = new CyclicBarrier(2);
Thread reconfigThread = new Thread(new Runnable() {
@Override
public void run() {
try {
reconfigBarrier.await();
// Wake up writing threads on the pipeline to finish the block.
barrier.await();
assertThat(
"DN did not update its own config",
dataNode.reconfigurePropertyImpl(
DFS_DATANODE_DATA_DIR_KEY, newDirs),
is(dataNode.getConf().get(DFS_DATANODE_DATA_DIR_KEY)));
done.set(true);
} catch (ReconfigurationException |
InterruptedException |
BrokenBarrierException e) {
exceptions.add(new IOException(e));
}
}
});
reconfigThread.start();
// Write more data to make sure the stream threads wait on the barrier.
rb.nextBytes(writeBuf);
out.write(writeBuf);
reconfigBarrier.await();
out.hflush();
out.close();
reconfigThread.join();
if (!exceptions.isEmpty()) {
throw MultipleIOException.createIOException(exceptions);
}
} finally {
DataNodeFaultInjector.set(oldInjector);
}
// Verify if the data directory reconfigure was successful
FsDatasetSpi<? extends FsVolumeSpi> fsDatasetSpi = dn.getFSDataset();
@ -852,19 +904,12 @@ public class TestDataNodeHotSwapVolumes {
1, fsVolumeReferences.size());
}
// Add a new DataNode to help with the pipeline recover.
cluster.startDataNodes(conf, 1, true, null, null, null);
// Verify the file has sufficient replications.
DFSTestUtil.waitReplication(fs, testFile, REPLICATION);
// Read the content back
byte[] content = DFSTestUtil.readFileBuffer(fs, testFile);
assertEquals(BLOCK_SIZE, content.length);
if (!exceptions.isEmpty()) {
throw new IOException(exceptions.get(0).getCause());
}
// Write more files to make sure that the DataNode that has removed volume
// is still alive to receive data.
for (int i = 0; i < 10; i++) {