diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java index ff81b5aae70..db19128043c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java @@ -420,6 +420,7 @@ class BlockSender implements java.io.Closeable { ris = new ReplicaInputStreams( blockIn, checksumIn, volumeRef, fileIoProvider); } catch (IOException ioe) { + IOUtils.cleanupWithLogger(null, volumeRef); IOUtils.closeStream(this); org.apache.commons.io.IOUtils.closeQuietly(blockIn); org.apache.commons.io.IOUtils.closeQuietly(checksumIn); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java index 10ee9cf5f64..93c8ea68fd8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java @@ -161,15 +161,23 @@ class FsDatasetAsyncDiskService { * Execute the task sometime in the future, using ThreadPools. */ synchronized void execute(File root, Runnable task) { - if (executors == null) { - throw new RuntimeException("AsyncDiskService is already shutdown"); - } - ThreadPoolExecutor executor = executors.get(root); - if (executor == null) { - throw new RuntimeException("Cannot find root " + root - + " for execution of task " + task); - } else { - executor.execute(task); + try { + if (executors == null) { + throw new RuntimeException("AsyncDiskService is already shutdown"); + } + ThreadPoolExecutor executor = executors.get(root); + if (executor == null) { + throw new RuntimeException("Cannot find root " + root + + " for execution of task " + task); + } else { + executor.execute(task); + } + } catch (RuntimeException re) { + if (task instanceof ReplicaFileDeleteTask) { + IOUtils.cleanupWithLogger(null, + ((ReplicaFileDeleteTask) task).volumeRef); + } + throw re; } } @@ -301,28 +309,31 @@ class FsDatasetAsyncDiskService { @Override public void run() { - final long blockLength = blockFile.length(); - final long metaLength = metaFile.length(); - boolean result; + try { + final long blockLength = blockFile.length(); + final long metaLength = metaFile.length(); + boolean result; - result = (trashDirectory == null) ? deleteFiles() : moveFiles(); + result = (trashDirectory == null) ? deleteFiles() : moveFiles(); - if (!result) { - LOG.warn("Unexpected error trying to " - + (trashDirectory == null ? "delete" : "move") - + " block " + block.getBlockPoolId() + " " + block.getLocalBlock() - + " at file " + blockFile + ". Ignored."); - } else { - if(block.getLocalBlock().getNumBytes() != BlockCommand.NO_ACK){ - datanode.notifyNamenodeDeletedBlock(block, volume.getStorageID()); - } - volume.onBlockFileDeletion(block.getBlockPoolId(), blockLength); - volume.onMetaFileDeletion(block.getBlockPoolId(), metaLength); - LOG.info("Deleted " + block.getBlockPoolId() + " " + if (!result) { + LOG.warn("Unexpected error trying to " + + (trashDirectory == null ? "delete" : "move") + + " block " + block.getBlockPoolId() + " " + block.getLocalBlock() + + " at file " + blockFile + ". Ignored."); + } else { + if (block.getLocalBlock().getNumBytes() != BlockCommand.NO_ACK){ + datanode.notifyNamenodeDeletedBlock(block, volume.getStorageID()); + } + volume.onBlockFileDeletion(block.getBlockPoolId(), blockLength); + volume.onMetaFileDeletion(block.getBlockPoolId(), metaLength); + LOG.info("Deleted " + block.getBlockPoolId() + " " + block.getLocalBlock() + " file " + blockFile); + } + updateDeletedBlockId(block); + } finally { + IOUtils.cleanupWithLogger(null, this.volumeRef); } - updateDeletedBlockId(block); - IOUtils.cleanup(null, volumeRef); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java index e43579e91d5..9c304c0c073 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java @@ -261,7 +261,7 @@ public class FsVolumeImpl implements FsVolumeSpi { } @VisibleForTesting - int getReferenceCount() { + public int getReferenceCount() { return this.reference.getReferenceCount(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskAsyncLazyPersistService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskAsyncLazyPersistService.java index 476f5961cec..868dd0decce 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskAsyncLazyPersistService.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskAsyncLazyPersistService.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.DFSUtilClient; import org.apache.hadoop.hdfs.HdfsConfiguration; @@ -146,15 +147,23 @@ class RamDiskAsyncLazyPersistService { * Execute the task sometime in the future, using ThreadPools. */ synchronized void execute(File root, Runnable task) { - if (executors == null) { - throw new RuntimeException("AsyncLazyPersistService is already shutdown"); - } - ThreadPoolExecutor executor = executors.get(root); - if (executor == null) { - throw new RuntimeException("Cannot find root " + root - + " for execution of task " + task); - } else { - executor.execute(task); + try { + if (executors == null) { + throw new RuntimeException("AsyncLazyPersistService is already shutdown"); + } + ThreadPoolExecutor executor = executors.get(root); + if (executor == null) { + throw new RuntimeException("Cannot find root " + root + + " for execution of task " + task); + } else { + executor.execute(task); + } + } catch (RuntimeException re) { + if (task instanceof ReplicaLazyPersistTask) { + IOUtils.cleanupWithLogger(null, + ((ReplicaLazyPersistTask) task).targetVolume); + } + throw re; } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java index 2335b9a8c6b..3b89fcdc6c6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java @@ -35,6 +35,9 @@ import java.util.Random; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; @@ -567,4 +570,56 @@ public class TestDataTransferProtocol { 0, block.getNumBytes(), block.getNumBytes(), newGS, checksum, CachingStrategy.newDefaultStrategy(), false, false, null); } + + @Test(timeout = 30000) + public void testReleaseVolumeRefIfExceptionThrown() + throws IOException, InterruptedException { + Path file = new Path("dataprotocol.dat"); + int numDataNodes = 1; + + Configuration conf = new HdfsConfiguration(); + conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, numDataNodes); + MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes( + numDataNodes).build(); + try { + cluster.waitActive(); + datanode = cluster.getFileSystem().getDataNodeStats( + DatanodeReportType.LIVE)[0]; + dnAddr = NetUtils.createSocketAddr(datanode.getXferAddr()); + FileSystem fileSys = cluster.getFileSystem(); + + int fileLen = Math.min( + conf.getInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 4096), 4096); + + DFSTestUtil.createFile(fileSys, file, fileLen, fileLen, + fileSys.getDefaultBlockSize(file), + fileSys.getDefaultReplication(file), 0L); + + // Get the first blockid for the file. + final ExtendedBlock firstBlock = DFSTestUtil.getFirstBlock(fileSys, file); + + String bpid = cluster.getNamesystem().getBlockPoolId(); + ExtendedBlock blk = new ExtendedBlock(bpid, firstBlock.getLocalBlock()); + sendBuf.reset(); + recvBuf.reset(); + + // Delete the meta file to create a exception in BlockSender constructor. + DataNode dn = cluster.getDataNodes().get(0); + cluster.getMaterializedReplica(0, blk).deleteMeta(); + + FsVolumeImpl volume = (FsVolumeImpl) DataNodeTestUtils.getFSDataset( + dn).getVolume(blk); + int beforeCnt = volume.getReferenceCount(); + + sender.copyBlock(blk, BlockTokenSecretManager.DUMMY_TOKEN); + sendRecvData("Copy a block.", false); + Thread.sleep(3000); + + int afterCnt = volume.getReferenceCount(); + assertEquals(beforeCnt, afterCnt); + + } finally { + cluster.shutdown(); + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java index 03f4b69dfe4..0ee3196c229 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java @@ -806,4 +806,37 @@ public class TestFsDatasetImpl { cluster.shutdown(); } } + + @Test(timeout = 20000) + public void testReleaseVolumeRefIfExceptionThrown() throws IOException { + MiniDFSCluster cluster = new MiniDFSCluster.Builder( + new HdfsConfiguration()).build(); + cluster.waitActive(); + FsVolumeImpl vol = (FsVolumeImpl) dataset.getFsVolumeReferences().get(0); + ExtendedBlock eb; + ReplicaInfo info; + int beforeCnt = 0; + try { + List blockList = new ArrayList(); + eb = new ExtendedBlock(BLOCKPOOL, 1, 1, 1001); + info = new FinalizedReplica( + eb.getLocalBlock(), vol, vol.getCurrentDir().getParentFile()); + dataset.volumeMap.add(BLOCKPOOL, info); + info.getBlockFile().createNewFile(); + info.getMetaFile().createNewFile(); + blockList.add(info); + + // Create a runtime exception. + dataset.asyncDiskService.shutdown(); + + beforeCnt = vol.getReferenceCount(); + dataset.invalidate(BLOCKPOOL, blockList.toArray(new Block[0])); + + } catch (RuntimeException re) { + int afterCnt = vol.getReferenceCount(); + assertEquals(beforeCnt, afterCnt); + } finally { + cluster.shutdown(); + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java index b177a7bd489..a22faf2d927 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java @@ -18,6 +18,9 @@ package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl; import org.apache.hadoop.fs.CreateFlag; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; import org.apache.hadoop.test.GenericTestUtils; import org.junit.Assert; import org.junit.Test; @@ -291,4 +294,38 @@ public class TestLazyPersistFiles extends LazyPersistTestCase { } } } + + @Test(timeout = 20000) + public void testReleaseVolumeRefIfExceptionThrown() + throws IOException, InterruptedException { + getClusterBuilder().setRamDiskReplicaCapacity(2).build(); + final String methodName = GenericTestUtils.getMethodName(); + final int seed = 0xFADED; + Path path = new Path("/" + methodName + ".Writer.File.dat"); + + DataNode dn = cluster.getDataNodes().get(0); + FsDatasetSpi.FsVolumeReferences volumes = + DataNodeTestUtils.getFSDataset(dn).getFsVolumeReferences(); + int[] beforeCnts = new int[volumes.size()]; + FsDatasetImpl ds = (FsDatasetImpl) DataNodeTestUtils.getFSDataset(dn); + + // Create a runtime exception. + ds.asyncLazyPersistService.shutdown(); + for (int i = 0; i < volumes.size(); ++i) { + beforeCnts[i] = ((FsVolumeImpl) volumes.get(i)).getReferenceCount(); + } + + makeRandomTestFile(path, BLOCK_SIZE, true, seed); + Thread.sleep(3 * LAZY_WRITER_INTERVAL_SEC * 1000); + + for (int i = 0; i < volumes.size(); ++i) { + int afterCnt = ((FsVolumeImpl) volumes.get(i)).getReferenceCount(); + // LazyWriter keeps trying to save copies even if + // asyncLazyPersistService is already shutdown. + // If we do not release references, the number of + // references will increase infinitely. + Assert.assertTrue( + beforeCnts[i] == afterCnt || beforeCnts[i] == (afterCnt - 1)); + } + } }