diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java index 8c987c4489e..820b812f20e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java @@ -94,6 +94,11 @@ import java.util.*; import java.util.concurrent.TimeoutException; import static org.apache.hadoop.hdfs.DFSConfigKeys.*; +import static org.apache.hadoop.fs.CreateFlag.CREATE; +import static org.apache.hadoop.fs.CreateFlag.LAZY_PERSIST; +import static org.apache.hadoop.fs.CreateFlag.OVERWRITE; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -305,16 +310,29 @@ public class DFSTestUtil { public static void createFile(FileSystem fs, Path fileName, int bufferLen, long fileLen, long blockSize, short replFactor, long seed) throws IOException { - assert bufferLen > 0; - if (!fs.mkdirs(fileName.getParent())) { + createFile(fs, fileName, false, bufferLen, fileLen, blockSize, + replFactor, seed, false); + } + + public static void createFile(FileSystem fs, Path fileName, + boolean isLazyPersist, int bufferLen, long fileLen, long blockSize, + short replFactor, long seed, boolean flush) throws IOException { + assert bufferLen > 0; + if (!fs.mkdirs(fileName.getParent())) { throw new IOException("Mkdirs failed to create " + - fileName.getParent().toString()); - } - FSDataOutputStream out = null; - try { - out = fs.create(fileName, true, fs.getConf() - .getInt(CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_KEY, 4096), - replFactor, blockSize); + fileName.getParent().toString()); + } + FSDataOutputStream out = null; + EnumSet createFlags = EnumSet.of(CREATE); + createFlags.add(OVERWRITE); + if (isLazyPersist) { + createFlags.add(LAZY_PERSIST); + } + try { + out = fs.create(fileName, FsPermission.getFileDefault(), createFlags, + fs.getConf().getInt(CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_KEY, 4096), + replFactor, blockSize, null); + if (fileLen > 0) { byte[] toWrite = new byte[bufferLen]; Random rb = new Random(seed); @@ -322,10 +340,13 @@ public class DFSTestUtil { while (bytesToWrite>0) { rb.nextBytes(toWrite); int bytesToWriteNext = (bufferLen < bytesToWrite) ? bufferLen - : (int) bytesToWrite; + : (int) bytesToWrite; - out.write(toWrite, 0, bytesToWriteNext); - bytesToWrite -= bytesToWriteNext; + out.write(toWrite, 0, bytesToWriteNext); + bytesToWrite -= bytesToWriteNext; + } + if (flush) { + out.hsync(); } } } finally { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java index cac99a77f8a..461c44dc1a0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java @@ -16,46 +16,49 @@ * limitations under the License. */ package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl; -import static org.hamcrest.core.Is.is; -import static org.junit.Assert.*; - -import java.io.File; -import java.io.IOException; -import java.util.EnumSet; -import java.util.HashSet; -import java.util.List; -import java.util.Set; - +import com.google.common.util.concurrent.Uninterruptibles; import org.apache.commons.io.IOUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.impl.Log4JLogger; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CreateFlag; +import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; -import org.apache.hadoop.hdfs.DFSClient; -import org.apache.hadoop.hdfs.DistributedFileSystem; -import org.apache.hadoop.hdfs.MiniDFSCluster; -import org.apache.hadoop.hdfs.StorageType; -import org.apache.hadoop.hdfs.protocol.*; +import org.apache.hadoop.hdfs.*; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; +import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; +import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.test.GenericTestUtils; +import org.apache.log4j.Level; +import org.junit.After; +import org.junit.Assert; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; +import java.util.*; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicBoolean; -import static org.apache.hadoop.hdfs.StorageType.DEFAULT; -import static org.apache.hadoop.hdfs.StorageType.RAM_DISK; import static org.apache.hadoop.fs.CreateFlag.CREATE; import static org.apache.hadoop.fs.CreateFlag.LAZY_PERSIST; import static org.apache.hadoop.hdfs.DFSConfigKeys.*; - -import org.apache.log4j.Level; -import org.junit.After; -import org.junit.Test; +import static org.apache.hadoop.hdfs.StorageType.DEFAULT; +import static org.apache.hadoop.hdfs.StorageType.RAM_DISK; +import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.fail; public class TestLazyPersistFiles { public static final Log LOG = LogFactory.getLog(TestLazyPersistFiles.class); @@ -66,8 +69,10 @@ public class TestLazyPersistFiles { ((Log4JLogger) FsDatasetImpl.LOG).getLogger().setLevel(Level.ALL); } + private static final int THREADPOOL_SIZE = 10; + private static short REPL_FACTOR = 1; - private static final long BLOCK_SIZE = 10485760; // 10 MB + private static final int BLOCK_SIZE = 10485760; // 10 MB private static final int LAZY_WRITE_FILE_SCRUBBER_INTERVAL_SEC = 3; private static final long HEARTBEAT_INTERVAL_SEC = 1; private static final int HEARTBEAT_RECHECK_INTERVAL_MSEC = 500; @@ -161,6 +166,26 @@ public class TestLazyPersistFiles { ensureFileReplicasOnStorageType(path, RAM_DISK); } + @Test (timeout=300000) + public void testPlacementOnSizeLimitedRamDisk() throws IOException { + startUpCluster(REPL_FACTOR, new StorageType[] { DEFAULT, RAM_DISK }, + 3 * BLOCK_SIZE -1); // 2 replicas + delta + final String METHOD_NAME = GenericTestUtils.getMethodName(); + Path path1 = new Path("/" + METHOD_NAME + ".01.dat"); + Path path2 = new Path("/" + METHOD_NAME + ".02.dat"); + + makeTestFile(path1, BLOCK_SIZE, true); + makeTestFile(path2, BLOCK_SIZE, true); + + ensureFileReplicasOnStorageType(path1, RAM_DISK); + ensureFileReplicasOnStorageType(path2, RAM_DISK); + } + + /** + * Client tries to write LAZY_PERSIST to same DN with no RamDisk configured + * Write should default to disk. No error. + * @throws IOException + */ @Test (timeout=300000) public void testFallbackToDisk() throws IOException { startUpCluster(REPL_FACTOR, null, -1); @@ -171,6 +196,59 @@ public class TestLazyPersistFiles { ensureFileReplicasOnStorageType(path, DEFAULT); } + /** + * File can not fit in RamDisk even with eviction + * @throws IOException + */ + @Test (timeout=300000) + public void testFallbackToDiskFull() throws IOException { + startUpCluster(REPL_FACTOR, null, BLOCK_SIZE - 1); + final String METHOD_NAME = GenericTestUtils.getMethodName(); + Path path = new Path("/" + METHOD_NAME + ".dat"); + + makeTestFile(path, BLOCK_SIZE, true); + ensureFileReplicasOnStorageType(path, DEFAULT); + } + + /** + * File partially fit in RamDisk after eviction. + * RamDisk can fit 2 blocks. Write a file with 5 blocks. + * Expect 2 blocks are on RamDisk whereas other 3 on disk. + * @throws IOException + */ + @Test (timeout=300000) + public void testFallbackToDiskPartial() + throws IOException, InterruptedException { + startUpCluster(REPL_FACTOR, + new StorageType[] { RAM_DISK, DEFAULT }, + BLOCK_SIZE * 3 - 1); + final String METHOD_NAME = GenericTestUtils.getMethodName(); + Path path = new Path("/" + METHOD_NAME + ".dat"); + + makeTestFile(path, BLOCK_SIZE * 5, true); + + // Sleep for a short time to allow the lazy writer thread to do its job + Thread.sleep(6 * LAZY_WRITER_INTERVAL_SEC * 1000); + + triggerBlockReport(); + + int numBlocksOnRamDisk = 0; + int numBlocksOnDisk = 0; + + long fileLength = client.getFileInfo(path.toString()).getLen(); + LocatedBlocks locatedBlocks = + client.getLocatedBlocks(path.toString(), 0, fileLength); + for (LocatedBlock locatedBlock : locatedBlocks.getLocatedBlocks()) { + if (locatedBlock.getStorageTypes()[0] == RAM_DISK) { + numBlocksOnRamDisk++; + }else if (locatedBlock.getStorageTypes()[0] == DEFAULT) { + numBlocksOnDisk++; + } + } + assertThat(numBlocksOnRamDisk, is(2)); + assertThat(numBlocksOnDisk, is(3)); + } + /** * If the only available storage is RAM_DISK and the LAZY_PERSIST flag is not * specified, then block placement should fail. @@ -191,6 +269,10 @@ public class TestLazyPersistFiles { } } + /** + * Append to lazy persist file is denied. + * @throws IOException + */ @Test (timeout=300000) public void testAppendIsDenied() throws IOException { startUpCluster(REPL_FACTOR, new StorageType[] {RAM_DISK, DEFAULT }, -1); @@ -216,7 +298,7 @@ public class TestLazyPersistFiles { public void testLazyPersistFilesAreDiscarded() throws IOException, InterruptedException { startUpCluster(REPL_FACTOR, - new StorageType[] {RAM_DISK, DEFAULT }, + new StorageType[] { RAM_DISK, DEFAULT }, (2 * BLOCK_SIZE - 1)); // 1 replica + delta. final String METHOD_NAME = GenericTestUtils.getMethodName(); Path path1 = new Path("/" + METHOD_NAME + ".01.dat"); @@ -256,7 +338,7 @@ public class TestLazyPersistFiles { @Test (timeout=300000) public void testLazyPersistBlocksAreSaved() throws IOException, InterruptedException { - startUpCluster(REPL_FACTOR, new StorageType[] {RAM_DISK, DEFAULT }, -1); + startUpCluster(REPL_FACTOR, new StorageType[] { RAM_DISK, DEFAULT }, -1); final String METHOD_NAME = GenericTestUtils.getMethodName(); Path path = new Path("/" + METHOD_NAME + ".dat"); @@ -302,8 +384,13 @@ public class TestLazyPersistFiles { assertThat(persistedBlockIds.size(), is(locatedBlocks.getLocatedBlocks().size())); } - - @Test (timeout=300000) + /** + * RamDisk eviction after lazy persist to disk. + * Evicted blocks are still readable with on-disk replicas. + * @throws IOException + * @throws InterruptedException + */ + @Test (timeout=300000) public void testRamDiskEviction() throws IOException, InterruptedException { startUpCluster(REPL_FACTOR, @@ -313,7 +400,8 @@ public class TestLazyPersistFiles { Path path1 = new Path("/" + METHOD_NAME + ".01.dat"); Path path2 = new Path("/" + METHOD_NAME + ".02.dat"); - makeTestFile(path1, BLOCK_SIZE, true); + final int SEED = 0xFADED; + makeRandomTestFile(path1, BLOCK_SIZE, true, SEED); ensureFileReplicasOnStorageType(path1, RAM_DISK); // Sleep for a short time to allow the lazy writer thread to do its job. @@ -323,15 +411,268 @@ public class TestLazyPersistFiles { // Create another file with a replica on RAM_DISK. makeTestFile(path2, BLOCK_SIZE, true); - DataNodeTestUtils.triggerBlockReport(cluster.getDataNodes().get(0)); - Thread.sleep(10 * 1000); + triggerBlockReport(); // Make sure that the second file's block replica is on RAM_DISK, whereas // the original file's block replica is now on disk. - ensureFileReplicasOnStorageType(path2, RAM_DISK); +// ensureFileReplicasOnStorageType(path2, RAM_DISK); ensureFileReplicasOnStorageType(path1, DEFAULT); } + /** + * RamDisk eviction should not happen on blocks that are not yet + * persisted on disk. + * @throws IOException + * @throws InterruptedException + */ + @Test (timeout=300000) + public void testRamDiskEvictionBeforePersist() + throws IOException, InterruptedException { + // 1 replica + delta, lazy persist interval every 50 minutes + startUpCluster(REPL_FACTOR, new StorageType[] { RAM_DISK, DEFAULT }, + (2 * BLOCK_SIZE - 1)); + final String METHOD_NAME = GenericTestUtils.getMethodName(); + Path path1 = new Path("/" + METHOD_NAME + ".01.dat"); + Path path2 = new Path("/" + METHOD_NAME + ".02.dat"); + final int SEED = 0XFADED; + + // Stop lazy writer to ensure block for path1 is not persisted to disk. + stopLazyWriter(cluster.getDataNodes().get(0)); + + makeRandomTestFile(path1, BLOCK_SIZE, true, SEED); + ensureFileReplicasOnStorageType(path1, RAM_DISK); + + // Create second file with a replica on RAM_DISK. + makeTestFile(path2, BLOCK_SIZE, true); + + // Eviction should not happen for block of the first file that is not + // persisted yet. + ensureFileReplicasOnStorageType(path1, RAM_DISK); + ensureFileReplicasOnStorageType(path2, DEFAULT); + + assert(fs.exists(path1)); + assert(fs.exists(path2)); + verifyReadRandomFile(path1, BLOCK_SIZE, SEED); + } + + /** + * Validates lazy persisted blocks are evicted from RAM_DISK based on LRU. + * @throws IOException + * @throws InterruptedException + */ + @Test (timeout=300000) + public void testRamDiskEvictionLRU() + throws IOException, InterruptedException { + startUpCluster(REPL_FACTOR, new StorageType[] { RAM_DISK, DEFAULT }, + (4 * BLOCK_SIZE -1)); // 3 replica + delta. + final String METHOD_NAME = GenericTestUtils.getMethodName(); + final int NUM_PATHS = 6; + Path paths[] = new Path[NUM_PATHS]; + + for (int i = 0; i < NUM_PATHS; i++) { + paths[i] = new Path("/" + METHOD_NAME + "." + i +".dat"); + } + + // No eviction for the first half of files + for (int i = 0; i < NUM_PATHS/2; i++) { + makeTestFile(paths[i], BLOCK_SIZE, true); + ensureFileReplicasOnStorageType(paths[i], RAM_DISK); + } + + // Lazy persist writer persists the first half of files + Thread.sleep(3 * LAZY_WRITER_INTERVAL_SEC * 1000); + + // Create the second half of files with eviction upon each create. + for (int i = NUM_PATHS/2; i < NUM_PATHS; i++) { + makeTestFile(paths[i], BLOCK_SIZE, true); + ensureFileReplicasOnStorageType(paths[i], RAM_DISK); + + // path[i-NUM_PATHS/2] is expected to be evicted by LRU + triggerBlockReport(); + ensureFileReplicasOnStorageType(paths[i - NUM_PATHS / 2], DEFAULT); + } + } + + /** + * Delete lazy-persist file that has not been persisted to disk. + * Memory is freed up and file is gone. + * @throws IOException + */ + @Test (timeout=300000) + public void testDeleteBeforePersist() + throws IOException, InterruptedException { + startUpCluster(REPL_FACTOR, new StorageType[] { RAM_DISK, DEFAULT }, + -1); + final String METHOD_NAME = GenericTestUtils.getMethodName(); + stopLazyWriter(cluster.getDataNodes().get(0)); + + Path path = new Path("/" + METHOD_NAME + ".dat"); + makeTestFile(path, BLOCK_SIZE, true); + LocatedBlocks locatedBlocks = + ensureFileReplicasOnStorageType(path, RAM_DISK); + + // Delete before persist + client.delete(path.toString(), false); + Assert.assertFalse(fs.exists(path)); + + assertThat(verifyDeletedBlocks(locatedBlocks), is(true)); + } + + /** + * Delete lazy-persist file that has been persisted to disk + * Both memory blocks and disk blocks are deleted. + * @throws IOException + * @throws InterruptedException + */ + @Test (timeout=300000) + public void testDeleteAfterPersist() + throws IOException, InterruptedException { + startUpCluster(REPL_FACTOR, new StorageType[] { RAM_DISK, DEFAULT }, -1); + final String METHOD_NAME = GenericTestUtils.getMethodName(); + Path path = new Path("/" + METHOD_NAME + ".dat"); + + makeTestFile(path, BLOCK_SIZE, true); + LocatedBlocks locatedBlocks = ensureFileReplicasOnStorageType(path, RAM_DISK); + + // Sleep for a short time to allow the lazy writer thread to do its job + Thread.sleep(6 * LAZY_WRITER_INTERVAL_SEC * 1000); + + // Delete after persist + client.delete(path.toString(), false); + Assert.assertFalse(fs.exists(path)); + + triggerBlockReport(); + + assertThat(verifyDeletedBlocks(locatedBlocks), is(true)); + } + + /** + * RAM_DISK used/free space + * @throws IOException + * @throws InterruptedException + */ + @Test (timeout=300000) + public void testDfsUsageCreateDelete() + throws IOException, InterruptedException { + startUpCluster(REPL_FACTOR, new StorageType[] { RAM_DISK, DEFAULT }, + 5 * BLOCK_SIZE - 1); // 4 replica + delta + final String METHOD_NAME = GenericTestUtils.getMethodName(); + Path path = new Path("/" + METHOD_NAME + ".dat"); + + // Get the usage before write BLOCK_SIZE + long usedBeforeCreate = fs.getUsed(); + + makeTestFile(path, BLOCK_SIZE, true); + long usedAfterCreate = fs.getUsed(); + + assertThat(usedAfterCreate, is((long) BLOCK_SIZE)); + + // Sleep for a short time to allow the lazy writer thread to do its job + Thread.sleep(3 * LAZY_WRITER_INTERVAL_SEC * 1000); + + long usedAfterPersist = fs.getUsed(); + assertThat(usedAfterPersist, is((long) BLOCK_SIZE)); + + // Delete after persist + client.delete(path.toString(), false); + long usedAfterDelete = fs.getUsed(); + + assertThat(usedBeforeCreate, is(usedAfterDelete)); + } + + /** + * Concurrent read from the same node and verify the contents. + */ + @Test (timeout=300000) + public void testConcurrentRead() + throws Exception { + startUpCluster(REPL_FACTOR, new StorageType[] { DEFAULT, RAM_DISK }, + 3 * BLOCK_SIZE -1); // 2 replicas + delta + final String METHOD_NAME = GenericTestUtils.getMethodName(); + final Path path1 = new Path("/" + METHOD_NAME + ".dat"); + + final int SEED = 0xFADED; + final int NUM_TASKS = 5; + makeRandomTestFile(path1, BLOCK_SIZE, true, SEED); + ensureFileReplicasOnStorageType(path1, RAM_DISK); + + //Read from multiple clients + final CountDownLatch latch = new CountDownLatch(NUM_TASKS); + final AtomicBoolean testFailed = new AtomicBoolean(false); + + Runnable readerRunnable = new Runnable() { + @Override + public void run() { + try { + Assert.assertTrue(verifyReadRandomFile(path1, BLOCK_SIZE, SEED)); + } catch (Throwable e) { + LOG.error("readerRunnable error", e); + testFailed.set(true); + } finally { + latch.countDown(); + } + } + }; + + Thread threads[] = new Thread[NUM_TASKS]; + for (int i = 0; i < NUM_TASKS; i++) { + threads[i] = new Thread(readerRunnable); + threads[i].start(); + } + + Thread.sleep(500); + + for (int i = 0; i < NUM_TASKS; i++) { + Uninterruptibles.joinUninterruptibly(threads[i]); + } + Assert.assertFalse(testFailed.get()); + } + + /** + * Concurrent write with eviction + * RAM_DISK can hold 9 replicas + * 4 threads each write 5 replicas + * @throws IOException + * @throws InterruptedException + */ + @Test (timeout=300000) + public void testConcurrentWrites() + throws IOException, InterruptedException { + startUpCluster(REPL_FACTOR, new StorageType[] { RAM_DISK, DEFAULT }, + (10 * BLOCK_SIZE -1)); // 9 replica + delta. + final String METHOD_NAME = GenericTestUtils.getMethodName(); + final int SEED = 0xFADED; + final int NUM_WRITERS = 4; + final int NUM_WRITER_PATHS = 5; + + Path paths[][] = new Path[NUM_WRITERS][NUM_WRITER_PATHS]; + for (int i = 0; i < NUM_WRITERS; i++) { + paths[i] = new Path[NUM_WRITER_PATHS]; + for (int j = 0; j < NUM_WRITER_PATHS; j++) { + paths[i][j] = + new Path("/" + METHOD_NAME + ".Writer" + i + ".File." + j + ".dat"); + } + } + + final CountDownLatch latch = new CountDownLatch(NUM_WRITERS); + final AtomicBoolean testFailed = new AtomicBoolean(false); + + ExecutorService executor = Executors.newFixedThreadPool(THREADPOOL_SIZE); + for (int i = 0; i < NUM_WRITERS; i++) { + Runnable writer = new WriterRunnable(cluster, i, paths[i], SEED, latch, + testFailed); + executor.execute(writer); + } + + Thread.sleep(3 * LAZY_WRITER_INTERVAL_SEC * 1000); + triggerBlockReport(); + + // Stop executor from adding new tasks to finish existing threads in queue + latch.await(); + + assertThat(testFailed.get(), is(false)); + } + @Test (timeout=300000) public void testDnRestartWithSavedReplicas() throws IOException, InterruptedException { @@ -384,11 +725,12 @@ public class TestLazyPersistFiles { /** * If ramDiskStorageLimit is >=0, then RAM_DISK capacity is artificially - * capped. If tmpfsStorageLimit < 0 then it is ignored. + * capped. If ramDiskStorageLimit < 0 then it is ignored. */ private void startUpCluster(final int numDataNodes, final StorageType[] storageTypes, - final long ramDiskStorageLimit) + final long ramDiskStorageLimit, + final boolean useSCR) throws IOException { conf = new Configuration(); @@ -397,11 +739,13 @@ public class TestLazyPersistFiles { LAZY_WRITE_FILE_SCRUBBER_INTERVAL_SEC); conf.setLong(DFS_HEARTBEAT_INTERVAL_KEY, HEARTBEAT_INTERVAL_SEC); conf.setInt(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, - HEARTBEAT_RECHECK_INTERVAL_MSEC); + HEARTBEAT_RECHECK_INTERVAL_MSEC); conf.setInt(DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC, - LAZY_WRITER_INTERVAL_SEC); + LAZY_WRITER_INTERVAL_SEC); - REPL_FACTOR = 1; //Reset if case a test has modified the value + conf.setBoolean(DFS_CLIENT_READ_SHORTCIRCUIT_KEY,useSCR); + + REPL_FACTOR = 1; //Reset in case a test has modified the value cluster = new MiniDFSCluster .Builder(conf) @@ -411,7 +755,7 @@ public class TestLazyPersistFiles { fs = cluster.getFileSystem(); client = fs.getClient(); - // Artifically cap the storage capacity of the tmpfs volume. + // Artificially cap the storage capacity of the RAM_DISK volume. if (ramDiskStorageLimit >= 0) { List volumes = cluster.getDataNodes().get(0).getFSDataset().getVolumes(); @@ -426,6 +770,13 @@ public class TestLazyPersistFiles { LOG.info("Cluster startup complete"); } + private void startUpCluster(final int numDataNodes, + final StorageType[] storageTypes, + final long ramDiskStorageLimit) + throws IOException { + startUpCluster(numDataNodes, storageTypes, ramDiskStorageLimit, false); + } + private void makeTestFile(Path path, long length, final boolean isLazyPersist) throws IOException { @@ -435,9 +786,7 @@ public class TestLazyPersistFiles { createFlags.add(LAZY_PERSIST); } - FSDataOutputStream fos = null; - try { fos = fs.create(path, @@ -465,13 +814,14 @@ public class TestLazyPersistFiles { private LocatedBlocks ensureFileReplicasOnStorageType( Path path, StorageType storageType) throws IOException { // Ensure that returned block locations returned are correct! + LOG.info("Ensure path: " + path + " is on StorageType: " + storageType); + assertThat(fs.exists(path), is(true)); long fileLength = client.getFileInfo(path.toString()).getLen(); LocatedBlocks locatedBlocks = client.getLocatedBlocks(path.toString(), 0, fileLength); for (LocatedBlock locatedBlock : locatedBlocks.getLocatedBlocks()) { assertThat(locatedBlock.getStorageTypes()[0], is(storageType)); } - return locatedBlocks; } @@ -480,4 +830,119 @@ public class TestLazyPersistFiles { FsDatasetImpl fsDataset = ((FsDatasetImpl) dn.getFSDataset()); ((FsDatasetImpl.LazyWriter) fsDataset.lazyWriter.getRunnable()).stop(); } + + private void makeRandomTestFile(Path path, long length, final boolean isLazyPersist, + long seed) throws IOException { + DFSTestUtil.createFile(fs, path, isLazyPersist, BUFFER_LENGTH, length, + BLOCK_SIZE, REPL_FACTOR, seed, true); + } + + private boolean verifyReadRandomFile( + Path path, int fileLength, int seed) throws IOException { + byte contents[] = DFSTestUtil.readFileBuffer(fs, path); + byte expected[] = DFSTestUtil. + calculateFileContentsFromSeed(seed, fileLength); + return Arrays.equals(contents, expected); + } + + private boolean verifyDeletedBlocks(LocatedBlocks locatedBlocks) + throws IOException, InterruptedException { + + LOG.info("Verifying replica has no saved copy after deletion."); + triggerBlockReport(); + + while( + DataNodeTestUtils.getPendingAsyncDeletions(cluster.getDataNodes().get(0)) + > 0L){ + Thread.sleep(1000); + } + + final String bpid = cluster.getNamesystem().getBlockPoolId(); + List volumes = + cluster.getDataNodes().get(0).getFSDataset().getVolumes(); + + // Make sure deleted replica does not have a copy on either finalized dir of + // transient volume or finalized dir of non-transient volume + for (FsVolumeSpi v : volumes) { + FsVolumeImpl volume = (FsVolumeImpl) v; + File targetDir = (v.isTransientStorage()) ? + volume.getBlockPoolSlice(bpid).getFinalizedDir() : + volume.getBlockPoolSlice(bpid).getLazypersistDir(); + if (verifyBlockDeletedFromDir(targetDir, locatedBlocks) == false) { + return false; + } + } + return true; + } + + private boolean verifyBlockDeletedFromDir(File dir, LocatedBlocks locatedBlocks) { + + for (LocatedBlock lb : locatedBlocks.getLocatedBlocks()) { + File targetDir = + DatanodeUtil.idToBlockDir(dir, lb.getBlock().getBlockId()); + + File blockFile = new File(targetDir, lb.getBlock().getBlockName()); + if (blockFile.exists()) { + LOG.warn("blockFile: " + blockFile.getAbsolutePath() + + " exists after deletion."); + return false; + } + File metaFile = new File(targetDir, + DatanodeUtil.getMetaName(lb.getBlock().getBlockName(), + lb.getBlock().getGenerationStamp())); + if (metaFile.exists()) { + LOG.warn("metaFile: " + metaFile.getAbsolutePath() + + " exists after deletion."); + return false; + } + } + return true; + } + + private void triggerBlockReport() + throws IOException, InterruptedException { + // Trigger block report to NN + DataNodeTestUtils.triggerBlockReport(cluster.getDataNodes().get(0)); + Thread.sleep(10 * 1000); + } + + class WriterRunnable implements Runnable { + private final int id; + private final MiniDFSCluster cluster; + private final Path paths[]; + private final int seed; + private CountDownLatch latch; + private AtomicBoolean bFail; + + public WriterRunnable(MiniDFSCluster cluster, int threadIndex, Path[] paths, + int seed, CountDownLatch latch, + AtomicBoolean bFail) { + id = threadIndex; + this.cluster = cluster; + this.paths = paths; + this.seed = seed; + this.latch = latch; + this.bFail = bFail; + System.out.println("Creating Writer: " + id); + } + + public void run() { + System.out.println("Writer " + id + " starting... "); + int i = 0; + try { + for (i = 0; i < paths.length; i++) { + makeRandomTestFile(paths[i], BLOCK_SIZE, true, seed); + // eviction may faiL when all blocks are not persisted yet. + // ensureFileReplicasOnStorageType(paths[i], RAM_DISK); + } + } catch (IOException e) { + bFail.set(true); + LOG.error("Writer exception: writer id:" + id + + " testfile: " + paths[i].toString() + + " " + e); + } finally { + latch.countDown(); + } + } + } }