diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestOpenFilesWithSnapshot.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestOpenFilesWithSnapshot.java index fb83a3e3ee1..bf27f2c3dac 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestOpenFilesWithSnapshot.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestOpenFilesWithSnapshot.java @@ -23,7 +23,11 @@ import java.util.EnumSet; import java.util.HashSet; import java.util.Random; import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.Path; @@ -38,12 +42,15 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter; import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; +import org.apache.hadoop.util.Time; import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; public class TestOpenFilesWithSnapshot { + private static final Log LOG = + LogFactory.getLog(TestOpenFilesWithSnapshot.class.getName()); private final Configuration conf = new Configuration(); MiniDFSCluster cluster = null; DistributedFileSystem fs = null; @@ -622,6 +629,108 @@ public class TestOpenFilesWithSnapshot { hbaseOutputStream.close(); } + /** + * Test client writing to open files are not interrupted when snapshots + * that captured open files get deleted. + */ + @Test (timeout = 240000) + public void testOpenFileWritingAcrossSnapDeletion() throws Exception { + final Path snapRootDir = new Path("/level_0_A"); + final String flumeFileName = "flume.log"; + final String hbaseFileName = "hbase.log"; + final String snap1Name = "snap_1"; + final String snap2Name = "snap_2"; + final String snap3Name = "snap_3"; + + // Create files and open streams + final Path flumeFile = new Path(snapRootDir, flumeFileName); + FSDataOutputStream flumeOut = fs.create(flumeFile, false, + 8000, (short)3, 1048576); + flumeOut.close(); + final Path hbaseFile = new Path(snapRootDir, hbaseFileName); + FSDataOutputStream hbaseOut = fs.create(hbaseFile, false, + 8000, (short)3, 1048576); + hbaseOut.close(); + + final AtomicBoolean writerError = new AtomicBoolean(false); + final CountDownLatch startLatch = new CountDownLatch(1); + final CountDownLatch deleteLatch = new CountDownLatch(1); + Thread t = new Thread(new Runnable() { + @Override + public void run() { + try { + FSDataOutputStream flumeOutputStream = fs.append(flumeFile, 8000); + FSDataOutputStream hbaseOutputStream = fs.append(hbaseFile, 8000); + byte[] bytes = new byte[(int) (1024 * 0.2)]; + Random r = new Random(Time.now()); + + for (int i = 0; i < 200000; i++) { + r.nextBytes(bytes); + flumeOutputStream.write(bytes); + if (hbaseOutputStream != null) { + hbaseOutputStream.write(bytes); + } + if (i == 50000) { + startLatch.countDown(); + } else if (i == 100000) { + deleteLatch.countDown(); + } else if (i == 150000) { + hbaseOutputStream.hsync(); + fs.delete(hbaseFile, true); + try { + hbaseOutputStream.close(); + } catch (Exception e) { + // since the file is deleted before the open stream close, + // it might throw FileNotFoundException. Ignore the + // expected exception. + } + hbaseOutputStream = null; + } else if (i % 5000 == 0) { + LOG.info("Write pos: " + flumeOutputStream.getPos() + + ", size: " + fs.getFileStatus(flumeFile).getLen() + + ", loop: " + (i + 1)); + } + } + } catch (Exception e) { + LOG.warn("Writer error: " + e); + writerError.set(true); + } + } + }); + t.start(); + + startLatch.await(); + final Path snap1Dir = SnapshotTestHelper.createSnapshot( + fs, snapRootDir, snap1Name); + final Path flumeS1Path = new Path(snap1Dir, flumeFileName); + LOG.info("Snap1 file status: " + fs.getFileStatus(flumeS1Path)); + LOG.info("Current file status: " + fs.getFileStatus(flumeFile)); + + deleteLatch.await(); + LOG.info("Snap1 file status: " + fs.getFileStatus(flumeS1Path)); + LOG.info("Current file status: " + fs.getFileStatus(flumeFile)); + + // Verify deletion of snapshot which had the under construction file + // captured is not truncating the under construction file and the thread + // writing to the same file not crashing on newer block allocations. + LOG.info("Deleting " + snap1Name); + fs.deleteSnapshot(snapRootDir, snap1Name); + + // Verify creation and deletion of snapshot newer than the oldest + // snapshot is not crashing the thread writing to under construction file. + SnapshotTestHelper.createSnapshot(fs, snapRootDir, snap2Name); + SnapshotTestHelper.createSnapshot(fs, snapRootDir, snap3Name); + fs.deleteSnapshot(snapRootDir, snap3Name); + fs.deleteSnapshot(snapRootDir, snap2Name); + SnapshotTestHelper.createSnapshot(fs, snapRootDir, "test"); + + t.join(); + Assert.assertFalse("Client encountered writing error!", writerError.get()); + + restartNameNode(); + cluster.waitActive(); + } + private void restartNameNode() throws Exception { cluster.triggerBlockReports(); NameNode nameNode = cluster.getNameNode();