diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java index 5d4ac1e1569..df5e29775a3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java @@ -29,6 +29,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdfs.BlockMissingException; +import org.apache.hadoop.hdfs.DFSClient; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.DistributedFileSystem; @@ -36,6 +37,8 @@ import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSNNTopology; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB; import org.apache.hadoop.hdfs.server.common.Storage; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; @@ -46,6 +49,7 @@ import org.apache.hadoop.hdfs.server.protocol.BlockReportContext; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport; +import org.apache.hadoop.io.MultipleIOException; import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.util.Time; import org.junit.After; @@ -68,6 +72,7 @@ import java.util.concurrent.CyclicBarrier; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -83,6 +88,7 @@ import static org.hamcrest.CoreMatchers.not; import static org.hamcrest.core.Is.is; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -775,12 +781,11 @@ public class TestDataNodeHotSwapVolumes { private void testRemoveVolumeBeingWrittenForDatanode(int dataNodeIdx) throws IOException, ReconfigurationException, TimeoutException, InterruptedException, BrokenBarrierException { - // Starts DFS cluster with 3 DataNodes to form a pipeline. - startDFSCluster(1, 3); + startDFSCluster(1, 4); final short REPLICATION = 3; - final DataNode dn = cluster.getDataNodes().get(dataNodeIdx); - final FileSystem fs = cluster.getFileSystem(); + final DistributedFileSystem fs = cluster.getFileSystem(); + final DFSClient client = fs.getClient(); final Path testFile = new Path("/test"); FSDataOutputStream out = fs.create(testFile, REPLICATION); @@ -790,54 +795,93 @@ public class TestDataNodeHotSwapVolumes { out.write(writeBuf); out.hflush(); - // Make FsDatasetSpi#finalizeBlock a time-consuming operation. So if the - // BlockReceiver releases volume reference before finalizeBlock(), the blocks - // on the volume will be removed, and finalizeBlock() throws IOE. - final FsDatasetSpi data = dn.data; - dn.data = Mockito.spy(data); - doAnswer(new Answer() { - public Object answer(InvocationOnMock invocation) - throws IOException, InterruptedException { - Thread.sleep(1000); - // Bypass the argument to FsDatasetImpl#finalizeBlock to verify that - // the block is not removed, since the volume reference should not - // be released at this point. - data.finalizeBlock((ExtendedBlock) invocation.getArguments()[0], - (boolean) invocation.getArguments()[1]); - return null; - } - }).when(dn.data).finalizeBlock(any(ExtendedBlock.class), - Mockito.anyBoolean()); + BlockLocation[] blocks = fs.getFileBlockLocations(testFile, 0, BLOCK_SIZE); + String[] dataNodeNames = blocks[0].getNames(); + String dataNodeName = dataNodeNames[dataNodeIdx]; + int xferPort = Integer.parseInt(dataNodeName.split(":")[1]); + DataNode dn = null; + for (DataNode dataNode : cluster.getDataNodes()) { + if (dataNode.getXferPort() == xferPort) { + dn = dataNode; + break; + } + } + assertNotNull(dn); - final CyclicBarrier barrier = new CyclicBarrier(2); - - List oldDirs = getDataDirs(dn); - final String newDirs = oldDirs.get(1); // Remove the first volume. - final List exceptions = new ArrayList<>(); - Thread reconfigThread = new Thread() { - public void run() { + final CyclicBarrier barrier = new CyclicBarrier(4); + final AtomicBoolean done = new AtomicBoolean(false); + DataNodeFaultInjector newInjector = new DataNodeFaultInjector() { + public void logDelaySendingAckToUpstream( + final String upstreamAddr, final long delayMs) throws IOException { try { - barrier.await(); - assertThat( - "DN did not update its own config", - dn.reconfigurePropertyImpl(DFS_DATANODE_DATA_DIR_KEY, newDirs), - is(dn.getConf().get(DFS_DATANODE_DATA_DIR_KEY))); - } catch (ReconfigurationException | - InterruptedException | - BrokenBarrierException e) { - exceptions.add(e); + // Make all streams which hold the volume references to wait the + // reconfiguration thread to start. + // It should only block IO during the period of reconfiguration + // task running. + if (!done.get()) { + barrier.await(); + // Add delays to allow the reconfiguration thread starts before + // IO finish. + Thread.sleep(1000); + } + } catch (InterruptedException | BrokenBarrierException e) { + throw new IOException(e); } } }; - reconfigThread.start(); + DataNodeFaultInjector oldInjector = DataNodeFaultInjector.get(); - barrier.await(); - rb.nextBytes(writeBuf); - out.write(writeBuf); - out.hflush(); - out.close(); + try { + DataNodeFaultInjector.set(newInjector); - reconfigThread.join(); + List oldDirs = getDataDirs(dn); + LocatedBlocks lbs = client.getLocatedBlocks("/test", 0); + LocatedBlock block = lbs.get(0); + FsVolumeImpl volume = + (FsVolumeImpl) dn.getFSDataset().getVolume(block.getBlock()); + final String newDirs = oldDirs.stream() + .filter((d) -> !d.contains(volume.getStorageLocation().toString())) + .collect(Collectors.joining(",")); + final List exceptions = new ArrayList<>(); + final DataNode dataNode = dn; + final CyclicBarrier reconfigBarrier = new CyclicBarrier(2); + + Thread reconfigThread = new Thread(() -> { + try { + reconfigBarrier.await(); + + // Wake up writing threads on the pipeline to finish the block. + barrier.await(); + + assertThat( + "DN did not update its own config", + dataNode.reconfigurePropertyImpl( + DFS_DATANODE_DATA_DIR_KEY, newDirs), + is(dataNode.getConf().get(DFS_DATANODE_DATA_DIR_KEY))); + done.set(true); + } catch (ReconfigurationException | + InterruptedException | + BrokenBarrierException e) { + exceptions.add(new IOException(e)); + } + }); + reconfigThread.start(); + + // Write more data to make sure the stream threads wait on the barrier. + rb.nextBytes(writeBuf); + out.write(writeBuf); + reconfigBarrier.await(); + out.hflush(); + out.close(); + + reconfigThread.join(); + + if (!exceptions.isEmpty()) { + throw MultipleIOException.createIOException(exceptions); + } + } finally { + DataNodeFaultInjector.set(oldInjector); + } // Verify if the data directory reconfigure was successful FsDatasetSpi fsDatasetSpi = dn.getFSDataset(); @@ -851,19 +895,12 @@ public class TestDataNodeHotSwapVolumes { 1, fsVolumeReferences.size()); } - // Add a new DataNode to help with the pipeline recover. - cluster.startDataNodes(conf, 1, true, null, null, null); - // Verify the file has sufficient replications. DFSTestUtil.waitReplication(fs, testFile, REPLICATION); // Read the content back byte[] content = DFSTestUtil.readFileBuffer(fs, testFile); assertEquals(BLOCK_SIZE, content.length); - if (!exceptions.isEmpty()) { - throw new IOException(exceptions.get(0).getCause()); - } - // Write more files to make sure that the DataNode that has removed volume // is still alive to receive data. for (int i = 0; i < 10; i++) {