diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java index fb6d83fcc6b..bb75e3aceb6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java @@ -432,6 +432,7 @@ class BlockSender implements java.io.Closeable { ris = new ReplicaInputStreams( blockIn, checksumIn, volumeRef, fileIoProvider); } catch (IOException ioe) { + IOUtils.cleanupWithLogger(null, volumeRef); IOUtils.closeStream(this); org.apache.commons.io.IOUtils.closeQuietly(blockIn); org.apache.commons.io.IOUtils.closeQuietly(checksumIn); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java index 2a89a80d17a..706c078e648 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java @@ -167,18 +167,26 @@ class FsDatasetAsyncDiskService { * Execute the task sometime in the future, using ThreadPools. */ synchronized void execute(FsVolumeImpl volume, Runnable task) { - if (executors == null) { - throw new RuntimeException("AsyncDiskService is already shutdown"); - } - if (volume == null) { - throw new RuntimeException("A null volume does not have a executor"); - } - ThreadPoolExecutor executor = executors.get(volume.getStorageID()); - if (executor == null) { - throw new RuntimeException("Cannot find volume " + volume - + " for execution of task " + task); - } else { - executor.execute(task); + try { + if (executors == null) { + throw new RuntimeException("AsyncDiskService is already shutdown"); + } + if (volume == null) { + throw new RuntimeException("A null volume does not have a executor"); + } + ThreadPoolExecutor executor = executors.get(volume.getStorageID()); + if (executor == null) { + throw new RuntimeException("Cannot find volume " + volume + + " for execution of task " + task); + } else { + executor.execute(task); + } + } catch (RuntimeException re) { + if (task instanceof ReplicaFileDeleteTask) { + IOUtils.cleanupWithLogger(null, + ((ReplicaFileDeleteTask) task).volumeRef); + } + throw re; } } @@ -314,28 +322,31 @@ class FsDatasetAsyncDiskService { @Override public void run() { - final long blockLength = replicaToDelete.getBlockDataLength(); - final long metaLength = replicaToDelete.getMetadataLength(); - boolean result; + try { + final long blockLength = replicaToDelete.getBlockDataLength(); + final long metaLength = replicaToDelete.getMetadataLength(); + boolean result; - result = (trashDirectory == null) ? deleteFiles() : moveFiles(); + result = (trashDirectory == null) ? deleteFiles() : moveFiles(); - if (!result) { - LOG.warn("Unexpected error trying to " - + (trashDirectory == null ? "delete" : "move") - + " block " + block.getBlockPoolId() + " " + block.getLocalBlock() - + " at file " + replicaToDelete.getBlockURI() + ". Ignored."); - } else { - if(block.getLocalBlock().getNumBytes() != BlockCommand.NO_ACK){ - datanode.notifyNamenodeDeletedBlock(block, volume.getStorageID()); + if (!result) { + LOG.warn("Unexpected error trying to " + + (trashDirectory == null ? "delete" : "move") + + " block " + block.getBlockPoolId() + " " + block.getLocalBlock() + + " at file " + replicaToDelete.getBlockURI() + ". Ignored."); + } else { + if (block.getLocalBlock().getNumBytes() != BlockCommand.NO_ACK) { + datanode.notifyNamenodeDeletedBlock(block, volume.getStorageID()); + } + volume.onBlockFileDeletion(block.getBlockPoolId(), blockLength); + volume.onMetaFileDeletion(block.getBlockPoolId(), metaLength); + LOG.info("Deleted " + block.getBlockPoolId() + " " + + block.getLocalBlock() + " URI " + replicaToDelete.getBlockURI()); } - volume.onBlockFileDeletion(block.getBlockPoolId(), blockLength); - volume.onMetaFileDeletion(block.getBlockPoolId(), metaLength); - LOG.info("Deleted " + block.getBlockPoolId() + " " - + block.getLocalBlock() + " URI " + replicaToDelete.getBlockURI()); + updateDeletedBlockId(block); + } finally { + IOUtils.cleanupWithLogger(null, this.volumeRef); } - updateDeletedBlockId(block); - IOUtils.cleanupWithLogger(null, volumeRef); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java index 07e14fb04e4..6681f6fd64c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java @@ -319,7 +319,7 @@ public class FsVolumeImpl implements FsVolumeSpi { } @VisibleForTesting - int getReferenceCount() { + public int getReferenceCount() { return this.reference.getReferenceCount(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskAsyncLazyPersistService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskAsyncLazyPersistService.java index a77faf2cec8..0d42ae99e35 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskAsyncLazyPersistService.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskAsyncLazyPersistService.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl; +import org.apache.hadoop.io.IOUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -153,16 +154,24 @@ class RamDiskAsyncLazyPersistService { * Execute the task sometime in the future, using ThreadPools. */ synchronized void execute(String storageId, Runnable task) { - if (executors == null) { - throw new RuntimeException( - "AsyncLazyPersistService is already shutdown"); - } - ThreadPoolExecutor executor = executors.get(storageId); - if (executor == null) { - throw new RuntimeException("Cannot find root storage volume with id " + - storageId + " for execution of task " + task); - } else { - executor.execute(task); + try { + if (executors == null) { + throw new RuntimeException( + "AsyncLazyPersistService is already shutdown"); + } + ThreadPoolExecutor executor = executors.get(storageId); + if (executor == null) { + throw new RuntimeException("Cannot find root storage volume with id " + + storageId + " for execution of task " + task); + } else { + executor.execute(task); + } + } catch (RuntimeException re) { + if (task instanceof ReplicaLazyPersistTask) { + IOUtils.cleanupWithLogger(null, + ((ReplicaLazyPersistTask) task).targetVolume); + } + throw re; } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java index b9da5f446f9..b1a675c77b6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java @@ -33,6 +33,9 @@ import java.net.Socket; import java.nio.ByteBuffer; import java.util.Random; +import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -562,4 +565,56 @@ public class TestDataTransferProtocol { checksum, CachingStrategy.newDefaultStrategy(), false, false, null, null, new String[0]); } + + @Test(timeout = 30000) + public void testReleaseVolumeRefIfExceptionThrown() + throws IOException, InterruptedException { + Path file = new Path("dataprotocol.dat"); + int numDataNodes = 1; + + Configuration conf = new HdfsConfiguration(); + conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, numDataNodes); + MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes( + numDataNodes).build(); + try { + cluster.waitActive(); + datanode = cluster.getFileSystem().getDataNodeStats( + DatanodeReportType.LIVE)[0]; + dnAddr = NetUtils.createSocketAddr(datanode.getXferAddr()); + FileSystem fileSys = cluster.getFileSystem(); + + int fileLen = Math.min( + conf.getInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 4096), 4096); + + DFSTestUtil.createFile(fileSys, file, fileLen, fileLen, + fileSys.getDefaultBlockSize(file), + fileSys.getDefaultReplication(file), 0L); + + // Get the first blockid for the file. + final ExtendedBlock firstBlock = DFSTestUtil.getFirstBlock(fileSys, file); + + String bpid = cluster.getNamesystem().getBlockPoolId(); + ExtendedBlock blk = new ExtendedBlock(bpid, firstBlock.getLocalBlock()); + sendBuf.reset(); + recvBuf.reset(); + + // Delete the meta file to create a exception in BlockSender constructor. + DataNode dn = cluster.getDataNodes().get(0); + cluster.getMaterializedReplica(0, blk).deleteMeta(); + + FsVolumeImpl volume = (FsVolumeImpl) DataNodeTestUtils.getFSDataset( + dn).getVolume(blk); + int beforeCnt = volume.getReferenceCount(); + + sender.copyBlock(blk, BlockTokenSecretManager.DUMMY_TOKEN); + sendRecvData("Copy a block.", false); + Thread.sleep(3000); + + int afterCnt = volume.getReferenceCount(); + assertEquals(beforeCnt, afterCnt); + + } finally { + cluster.shutdown(); + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java index 6ae6248d3f9..778ef97180b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java @@ -24,6 +24,7 @@ import java.util.function.Supplier; import org.apache.hadoop.fs.DF; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; import org.apache.hadoop.hdfs.server.datanode.DirectoryScanner; +import org.apache.hadoop.hdfs.server.datanode.LocalReplica; import org.apache.hadoop.thirdparty.com.google.common.collect.Lists; import java.io.OutputStream; @@ -1805,4 +1806,37 @@ public class TestFsDatasetImpl { cluster.shutdown(); } } + + @Test(timeout = 20000) + public void testReleaseVolumeRefIfExceptionThrown() throws IOException { + MiniDFSCluster cluster = new MiniDFSCluster.Builder( + new HdfsConfiguration()).build(); + cluster.waitActive(); + FsVolumeImpl vol = (FsVolumeImpl) dataset.getFsVolumeReferences().get(0); + ExtendedBlock eb; + ReplicaInfo info; + int beforeCnt = 0; + try { + List blockList = new ArrayList(); + eb = new ExtendedBlock(BLOCKPOOL, 1, 1, 1001); + info = new FinalizedReplica( + eb.getLocalBlock(), vol, vol.getCurrentDir().getParentFile()); + dataset.volumeMap.add(BLOCKPOOL, info); + ((LocalReplica) info).getBlockFile().createNewFile(); + ((LocalReplica) info).getMetaFile().createNewFile(); + blockList.add(info); + + // Create a runtime exception. + dataset.asyncDiskService.shutdown(); + + beforeCnt = vol.getReferenceCount(); + dataset.invalidate(BLOCKPOOL, blockList.toArray(new Block[0])); + + } catch (RuntimeException re) { + int afterCnt = vol.getReferenceCount(); + assertEquals(beforeCnt, afterCnt); + } finally { + cluster.shutdown(); + } + } } \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java index c0b4b17ea95..14ed26e9b55 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java @@ -18,6 +18,9 @@ package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl; import org.apache.hadoop.fs.CreateFlag; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.util.ThreadUtil; import org.junit.Assert; @@ -280,4 +283,38 @@ public class TestLazyPersistFiles extends LazyPersistTestCase { } } } + + @Test(timeout = 20000) + public void testReleaseVolumeRefIfExceptionThrown() + throws IOException, InterruptedException { + getClusterBuilder().setRamDiskReplicaCapacity(2).build(); + final String methodName = GenericTestUtils.getMethodName(); + final int seed = 0xFADED; + Path path = new Path("/" + methodName + ".Writer.File.dat"); + + DataNode dn = cluster.getDataNodes().get(0); + FsDatasetSpi.FsVolumeReferences volumes = + DataNodeTestUtils.getFSDataset(dn).getFsVolumeReferences(); + int[] beforeCnts = new int[volumes.size()]; + FsDatasetImpl ds = (FsDatasetImpl) DataNodeTestUtils.getFSDataset(dn); + + // Create a runtime exception. + ds.asyncLazyPersistService.shutdown(); + for (int i = 0; i < volumes.size(); ++i) { + beforeCnts[i] = ((FsVolumeImpl) volumes.get(i)).getReferenceCount(); + } + + makeRandomTestFile(path, BLOCK_SIZE, true, seed); + Thread.sleep(3 * LAZY_WRITER_INTERVAL_SEC * 1000); + + for (int i = 0; i < volumes.size(); ++i) { + int afterCnt = ((FsVolumeImpl) volumes.get(i)).getReferenceCount(); + // LazyWriter keeps trying to save copies even if + // asyncLazyPersistService is already shutdown. + // If we do not release references, the number of + // references will increase infinitely. + Assert.assertTrue( + beforeCnts[i] == afterCnt || beforeCnts[i] == (afterCnt - 1)); + } + } }