HDFS-6950. Add Additional unit tests for HDFS-6581. (Contributed by Xiaoyu Yao)

Conflicts:
	hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-6581.txt
	hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
This commit is contained in:
arp 2014-09-03 10:51:26 -07:00 committed by Jitendra Pandey
parent 43ea145c39
commit 225ffdb6d8
2 changed files with 537 additions and 51 deletions

View File

@ -94,6 +94,11 @@
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import static org.apache.hadoop.hdfs.DFSConfigKeys.*; import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
import static org.apache.hadoop.fs.CreateFlag.CREATE;
import static org.apache.hadoop.fs.CreateFlag.LAZY_PERSIST;
import static org.apache.hadoop.fs.CreateFlag.OVERWRITE;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
@ -305,16 +310,29 @@ public static void createFile(FileSystem fs, Path fileName, long fileLen,
public static void createFile(FileSystem fs, Path fileName, int bufferLen, public static void createFile(FileSystem fs, Path fileName, int bufferLen,
long fileLen, long blockSize, short replFactor, long seed) long fileLen, long blockSize, short replFactor, long seed)
throws IOException { throws IOException {
createFile(fs, fileName, false, bufferLen, fileLen, blockSize,
replFactor, seed, false);
}
public static void createFile(FileSystem fs, Path fileName,
boolean isLazyPersist, int bufferLen, long fileLen, long blockSize,
short replFactor, long seed, boolean flush) throws IOException {
assert bufferLen > 0; assert bufferLen > 0;
if (!fs.mkdirs(fileName.getParent())) { if (!fs.mkdirs(fileName.getParent())) {
throw new IOException("Mkdirs failed to create " + throw new IOException("Mkdirs failed to create " +
fileName.getParent().toString()); fileName.getParent().toString());
} }
FSDataOutputStream out = null; FSDataOutputStream out = null;
EnumSet<CreateFlag> createFlags = EnumSet.of(CREATE);
createFlags.add(OVERWRITE);
if (isLazyPersist) {
createFlags.add(LAZY_PERSIST);
}
try { try {
out = fs.create(fileName, true, fs.getConf() out = fs.create(fileName, FsPermission.getFileDefault(), createFlags,
.getInt(CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_KEY, 4096), fs.getConf().getInt(CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_KEY, 4096),
replFactor, blockSize); replFactor, blockSize, null);
if (fileLen > 0) { if (fileLen > 0) {
byte[] toWrite = new byte[bufferLen]; byte[] toWrite = new byte[bufferLen];
Random rb = new Random(seed); Random rb = new Random(seed);
@ -327,6 +345,9 @@ public static void createFile(FileSystem fs, Path fileName, int bufferLen,
out.write(toWrite, 0, bytesToWriteNext); out.write(toWrite, 0, bytesToWriteNext);
bytesToWrite -= bytesToWriteNext; bytesToWrite -= bytesToWriteNext;
} }
if (flush) {
out.hsync();
}
} }
} finally { } finally {
if (out != null) { if (out != null) {

View File

@ -16,46 +16,49 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl; package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
import static org.hamcrest.core.Is.is; import com.google.common.util.concurrent.Uninterruptibles;
import static org.junit.Assert.*;
import java.io.File;
import java.io.IOException;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.impl.Log4JLogger; import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CreateFlag; import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DFSClient; import org.apache.hadoop.hdfs.*;
import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.StorageType; import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.*; import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil; import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.GenericTestUtils;
import org.apache.log4j.Level;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
import java.io.File;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.apache.hadoop.hdfs.StorageType.DEFAULT;
import static org.apache.hadoop.hdfs.StorageType.RAM_DISK;
import static org.apache.hadoop.fs.CreateFlag.CREATE; import static org.apache.hadoop.fs.CreateFlag.CREATE;
import static org.apache.hadoop.fs.CreateFlag.LAZY_PERSIST; import static org.apache.hadoop.fs.CreateFlag.LAZY_PERSIST;
import static org.apache.hadoop.hdfs.DFSConfigKeys.*; import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
import static org.apache.hadoop.hdfs.StorageType.DEFAULT;
import org.apache.log4j.Level; import static org.apache.hadoop.hdfs.StorageType.RAM_DISK;
import org.junit.After; import static org.hamcrest.core.Is.is;
import org.junit.Test; import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;
public class TestLazyPersistFiles { public class TestLazyPersistFiles {
public static final Log LOG = LogFactory.getLog(TestLazyPersistFiles.class); public static final Log LOG = LogFactory.getLog(TestLazyPersistFiles.class);
@ -66,8 +69,10 @@ public class TestLazyPersistFiles {
((Log4JLogger) FsDatasetImpl.LOG).getLogger().setLevel(Level.ALL); ((Log4JLogger) FsDatasetImpl.LOG).getLogger().setLevel(Level.ALL);
} }
private static final int THREADPOOL_SIZE = 10;
private static short REPL_FACTOR = 1; private static short REPL_FACTOR = 1;
private static final long BLOCK_SIZE = 10485760; // 10 MB private static final int BLOCK_SIZE = 10485760; // 10 MB
private static final int LAZY_WRITE_FILE_SCRUBBER_INTERVAL_SEC = 3; private static final int LAZY_WRITE_FILE_SCRUBBER_INTERVAL_SEC = 3;
private static final long HEARTBEAT_INTERVAL_SEC = 1; private static final long HEARTBEAT_INTERVAL_SEC = 1;
private static final int HEARTBEAT_RECHECK_INTERVAL_MSEC = 500; private static final int HEARTBEAT_RECHECK_INTERVAL_MSEC = 500;
@ -161,6 +166,26 @@ public void testPlacementOnRamDisk() throws IOException {
ensureFileReplicasOnStorageType(path, RAM_DISK); ensureFileReplicasOnStorageType(path, RAM_DISK);
} }
@Test (timeout=300000)
public void testPlacementOnSizeLimitedRamDisk() throws IOException {
startUpCluster(REPL_FACTOR, new StorageType[] { DEFAULT, RAM_DISK },
3 * BLOCK_SIZE -1); // 2 replicas + delta
final String METHOD_NAME = GenericTestUtils.getMethodName();
Path path1 = new Path("/" + METHOD_NAME + ".01.dat");
Path path2 = new Path("/" + METHOD_NAME + ".02.dat");
makeTestFile(path1, BLOCK_SIZE, true);
makeTestFile(path2, BLOCK_SIZE, true);
ensureFileReplicasOnStorageType(path1, RAM_DISK);
ensureFileReplicasOnStorageType(path2, RAM_DISK);
}
/**
* Client tries to write LAZY_PERSIST to same DN with no RamDisk configured
* Write should default to disk. No error.
* @throws IOException
*/
@Test (timeout=300000) @Test (timeout=300000)
public void testFallbackToDisk() throws IOException { public void testFallbackToDisk() throws IOException {
startUpCluster(REPL_FACTOR, null, -1); startUpCluster(REPL_FACTOR, null, -1);
@ -171,6 +196,59 @@ public void testFallbackToDisk() throws IOException {
ensureFileReplicasOnStorageType(path, DEFAULT); ensureFileReplicasOnStorageType(path, DEFAULT);
} }
/**
* File can not fit in RamDisk even with eviction
* @throws IOException
*/
@Test (timeout=300000)
public void testFallbackToDiskFull() throws IOException {
startUpCluster(REPL_FACTOR, null, BLOCK_SIZE - 1);
final String METHOD_NAME = GenericTestUtils.getMethodName();
Path path = new Path("/" + METHOD_NAME + ".dat");
makeTestFile(path, BLOCK_SIZE, true);
ensureFileReplicasOnStorageType(path, DEFAULT);
}
/**
* File partially fit in RamDisk after eviction.
* RamDisk can fit 2 blocks. Write a file with 5 blocks.
* Expect 2 blocks are on RamDisk whereas other 3 on disk.
* @throws IOException
*/
@Test (timeout=300000)
public void testFallbackToDiskPartial()
throws IOException, InterruptedException {
startUpCluster(REPL_FACTOR,
new StorageType[] { RAM_DISK, DEFAULT },
BLOCK_SIZE * 3 - 1);
final String METHOD_NAME = GenericTestUtils.getMethodName();
Path path = new Path("/" + METHOD_NAME + ".dat");
makeTestFile(path, BLOCK_SIZE * 5, true);
// Sleep for a short time to allow the lazy writer thread to do its job
Thread.sleep(6 * LAZY_WRITER_INTERVAL_SEC * 1000);
triggerBlockReport();
int numBlocksOnRamDisk = 0;
int numBlocksOnDisk = 0;
long fileLength = client.getFileInfo(path.toString()).getLen();
LocatedBlocks locatedBlocks =
client.getLocatedBlocks(path.toString(), 0, fileLength);
for (LocatedBlock locatedBlock : locatedBlocks.getLocatedBlocks()) {
if (locatedBlock.getStorageTypes()[0] == RAM_DISK) {
numBlocksOnRamDisk++;
}else if (locatedBlock.getStorageTypes()[0] == DEFAULT) {
numBlocksOnDisk++;
}
}
assertThat(numBlocksOnRamDisk, is(2));
assertThat(numBlocksOnDisk, is(3));
}
/** /**
* If the only available storage is RAM_DISK and the LAZY_PERSIST flag is not * If the only available storage is RAM_DISK and the LAZY_PERSIST flag is not
* specified, then block placement should fail. * specified, then block placement should fail.
@ -191,6 +269,10 @@ public void testRamDiskNotChosenByDefault() throws IOException {
} }
} }
/**
* Append to lazy persist file is denied.
* @throws IOException
*/
@Test (timeout=300000) @Test (timeout=300000)
public void testAppendIsDenied() throws IOException { public void testAppendIsDenied() throws IOException {
startUpCluster(REPL_FACTOR, new StorageType[] {RAM_DISK, DEFAULT }, -1); startUpCluster(REPL_FACTOR, new StorageType[] {RAM_DISK, DEFAULT }, -1);
@ -302,7 +384,12 @@ public void testLazyPersistBlocksAreSaved()
assertThat(persistedBlockIds.size(), is(locatedBlocks.getLocatedBlocks().size())); assertThat(persistedBlockIds.size(), is(locatedBlocks.getLocatedBlocks().size()));
} }
/**
* RamDisk eviction after lazy persist to disk.
* Evicted blocks are still readable with on-disk replicas.
* @throws IOException
* @throws InterruptedException
*/
@Test (timeout=300000) @Test (timeout=300000)
public void testRamDiskEviction() public void testRamDiskEviction()
throws IOException, InterruptedException { throws IOException, InterruptedException {
@ -313,7 +400,8 @@ public void testRamDiskEviction()
Path path1 = new Path("/" + METHOD_NAME + ".01.dat"); Path path1 = new Path("/" + METHOD_NAME + ".01.dat");
Path path2 = new Path("/" + METHOD_NAME + ".02.dat"); Path path2 = new Path("/" + METHOD_NAME + ".02.dat");
makeTestFile(path1, BLOCK_SIZE, true); final int SEED = 0xFADED;
makeRandomTestFile(path1, BLOCK_SIZE, true, SEED);
ensureFileReplicasOnStorageType(path1, RAM_DISK); ensureFileReplicasOnStorageType(path1, RAM_DISK);
// Sleep for a short time to allow the lazy writer thread to do its job. // Sleep for a short time to allow the lazy writer thread to do its job.
@ -323,15 +411,268 @@ public void testRamDiskEviction()
// Create another file with a replica on RAM_DISK. // Create another file with a replica on RAM_DISK.
makeTestFile(path2, BLOCK_SIZE, true); makeTestFile(path2, BLOCK_SIZE, true);
DataNodeTestUtils.triggerBlockReport(cluster.getDataNodes().get(0)); triggerBlockReport();
Thread.sleep(10 * 1000);
// Make sure that the second file's block replica is on RAM_DISK, whereas // Make sure that the second file's block replica is on RAM_DISK, whereas
// the original file's block replica is now on disk. // the original file's block replica is now on disk.
ensureFileReplicasOnStorageType(path2, RAM_DISK); // ensureFileReplicasOnStorageType(path2, RAM_DISK);
ensureFileReplicasOnStorageType(path1, DEFAULT); ensureFileReplicasOnStorageType(path1, DEFAULT);
} }
/**
* RamDisk eviction should not happen on blocks that are not yet
* persisted on disk.
* @throws IOException
* @throws InterruptedException
*/
@Test (timeout=300000)
public void testRamDiskEvictionBeforePersist()
throws IOException, InterruptedException {
// 1 replica + delta, lazy persist interval every 50 minutes
startUpCluster(REPL_FACTOR, new StorageType[] { RAM_DISK, DEFAULT },
(2 * BLOCK_SIZE - 1));
final String METHOD_NAME = GenericTestUtils.getMethodName();
Path path1 = new Path("/" + METHOD_NAME + ".01.dat");
Path path2 = new Path("/" + METHOD_NAME + ".02.dat");
final int SEED = 0XFADED;
// Stop lazy writer to ensure block for path1 is not persisted to disk.
stopLazyWriter(cluster.getDataNodes().get(0));
makeRandomTestFile(path1, BLOCK_SIZE, true, SEED);
ensureFileReplicasOnStorageType(path1, RAM_DISK);
// Create second file with a replica on RAM_DISK.
makeTestFile(path2, BLOCK_SIZE, true);
// Eviction should not happen for block of the first file that is not
// persisted yet.
ensureFileReplicasOnStorageType(path1, RAM_DISK);
ensureFileReplicasOnStorageType(path2, DEFAULT);
assert(fs.exists(path1));
assert(fs.exists(path2));
verifyReadRandomFile(path1, BLOCK_SIZE, SEED);
}
/**
* Validates lazy persisted blocks are evicted from RAM_DISK based on LRU.
* @throws IOException
* @throws InterruptedException
*/
@Test (timeout=300000)
public void testRamDiskEvictionLRU()
throws IOException, InterruptedException {
startUpCluster(REPL_FACTOR, new StorageType[] { RAM_DISK, DEFAULT },
(4 * BLOCK_SIZE -1)); // 3 replica + delta.
final String METHOD_NAME = GenericTestUtils.getMethodName();
final int NUM_PATHS = 6;
Path paths[] = new Path[NUM_PATHS];
for (int i = 0; i < NUM_PATHS; i++) {
paths[i] = new Path("/" + METHOD_NAME + "." + i +".dat");
}
// No eviction for the first half of files
for (int i = 0; i < NUM_PATHS/2; i++) {
makeTestFile(paths[i], BLOCK_SIZE, true);
ensureFileReplicasOnStorageType(paths[i], RAM_DISK);
}
// Lazy persist writer persists the first half of files
Thread.sleep(3 * LAZY_WRITER_INTERVAL_SEC * 1000);
// Create the second half of files with eviction upon each create.
for (int i = NUM_PATHS/2; i < NUM_PATHS; i++) {
makeTestFile(paths[i], BLOCK_SIZE, true);
ensureFileReplicasOnStorageType(paths[i], RAM_DISK);
// path[i-NUM_PATHS/2] is expected to be evicted by LRU
triggerBlockReport();
ensureFileReplicasOnStorageType(paths[i - NUM_PATHS / 2], DEFAULT);
}
}
/**
* Delete lazy-persist file that has not been persisted to disk.
* Memory is freed up and file is gone.
* @throws IOException
*/
@Test (timeout=300000)
public void testDeleteBeforePersist()
throws IOException, InterruptedException {
startUpCluster(REPL_FACTOR, new StorageType[] { RAM_DISK, DEFAULT },
-1);
final String METHOD_NAME = GenericTestUtils.getMethodName();
stopLazyWriter(cluster.getDataNodes().get(0));
Path path = new Path("/" + METHOD_NAME + ".dat");
makeTestFile(path, BLOCK_SIZE, true);
LocatedBlocks locatedBlocks =
ensureFileReplicasOnStorageType(path, RAM_DISK);
// Delete before persist
client.delete(path.toString(), false);
Assert.assertFalse(fs.exists(path));
assertThat(verifyDeletedBlocks(locatedBlocks), is(true));
}
/**
* Delete lazy-persist file that has been persisted to disk
* Both memory blocks and disk blocks are deleted.
* @throws IOException
* @throws InterruptedException
*/
@Test (timeout=300000)
public void testDeleteAfterPersist()
throws IOException, InterruptedException {
startUpCluster(REPL_FACTOR, new StorageType[] { RAM_DISK, DEFAULT }, -1);
final String METHOD_NAME = GenericTestUtils.getMethodName();
Path path = new Path("/" + METHOD_NAME + ".dat");
makeTestFile(path, BLOCK_SIZE, true);
LocatedBlocks locatedBlocks = ensureFileReplicasOnStorageType(path, RAM_DISK);
// Sleep for a short time to allow the lazy writer thread to do its job
Thread.sleep(6 * LAZY_WRITER_INTERVAL_SEC * 1000);
// Delete after persist
client.delete(path.toString(), false);
Assert.assertFalse(fs.exists(path));
triggerBlockReport();
assertThat(verifyDeletedBlocks(locatedBlocks), is(true));
}
/**
* RAM_DISK used/free space
* @throws IOException
* @throws InterruptedException
*/
@Test (timeout=300000)
public void testDfsUsageCreateDelete()
throws IOException, InterruptedException {
startUpCluster(REPL_FACTOR, new StorageType[] { RAM_DISK, DEFAULT },
5 * BLOCK_SIZE - 1); // 4 replica + delta
final String METHOD_NAME = GenericTestUtils.getMethodName();
Path path = new Path("/" + METHOD_NAME + ".dat");
// Get the usage before write BLOCK_SIZE
long usedBeforeCreate = fs.getUsed();
makeTestFile(path, BLOCK_SIZE, true);
long usedAfterCreate = fs.getUsed();
assertThat(usedAfterCreate, is((long) BLOCK_SIZE));
// Sleep for a short time to allow the lazy writer thread to do its job
Thread.sleep(3 * LAZY_WRITER_INTERVAL_SEC * 1000);
long usedAfterPersist = fs.getUsed();
assertThat(usedAfterPersist, is((long) BLOCK_SIZE));
// Delete after persist
client.delete(path.toString(), false);
long usedAfterDelete = fs.getUsed();
assertThat(usedBeforeCreate, is(usedAfterDelete));
}
/**
* Concurrent read from the same node and verify the contents.
*/
@Test (timeout=300000)
public void testConcurrentRead()
throws Exception {
startUpCluster(REPL_FACTOR, new StorageType[] { DEFAULT, RAM_DISK },
3 * BLOCK_SIZE -1); // 2 replicas + delta
final String METHOD_NAME = GenericTestUtils.getMethodName();
final Path path1 = new Path("/" + METHOD_NAME + ".dat");
final int SEED = 0xFADED;
final int NUM_TASKS = 5;
makeRandomTestFile(path1, BLOCK_SIZE, true, SEED);
ensureFileReplicasOnStorageType(path1, RAM_DISK);
//Read from multiple clients
final CountDownLatch latch = new CountDownLatch(NUM_TASKS);
final AtomicBoolean testFailed = new AtomicBoolean(false);
Runnable readerRunnable = new Runnable() {
@Override
public void run() {
try {
Assert.assertTrue(verifyReadRandomFile(path1, BLOCK_SIZE, SEED));
} catch (Throwable e) {
LOG.error("readerRunnable error", e);
testFailed.set(true);
} finally {
latch.countDown();
}
}
};
Thread threads[] = new Thread[NUM_TASKS];
for (int i = 0; i < NUM_TASKS; i++) {
threads[i] = new Thread(readerRunnable);
threads[i].start();
}
Thread.sleep(500);
for (int i = 0; i < NUM_TASKS; i++) {
Uninterruptibles.joinUninterruptibly(threads[i]);
}
Assert.assertFalse(testFailed.get());
}
/**
* Concurrent write with eviction
* RAM_DISK can hold 9 replicas
* 4 threads each write 5 replicas
* @throws IOException
* @throws InterruptedException
*/
@Test (timeout=300000)
public void testConcurrentWrites()
throws IOException, InterruptedException {
startUpCluster(REPL_FACTOR, new StorageType[] { RAM_DISK, DEFAULT },
(10 * BLOCK_SIZE -1)); // 9 replica + delta.
final String METHOD_NAME = GenericTestUtils.getMethodName();
final int SEED = 0xFADED;
final int NUM_WRITERS = 4;
final int NUM_WRITER_PATHS = 5;
Path paths[][] = new Path[NUM_WRITERS][NUM_WRITER_PATHS];
for (int i = 0; i < NUM_WRITERS; i++) {
paths[i] = new Path[NUM_WRITER_PATHS];
for (int j = 0; j < NUM_WRITER_PATHS; j++) {
paths[i][j] =
new Path("/" + METHOD_NAME + ".Writer" + i + ".File." + j + ".dat");
}
}
final CountDownLatch latch = new CountDownLatch(NUM_WRITERS);
final AtomicBoolean testFailed = new AtomicBoolean(false);
ExecutorService executor = Executors.newFixedThreadPool(THREADPOOL_SIZE);
for (int i = 0; i < NUM_WRITERS; i++) {
Runnable writer = new WriterRunnable(cluster, i, paths[i], SEED, latch,
testFailed);
executor.execute(writer);
}
Thread.sleep(3 * LAZY_WRITER_INTERVAL_SEC * 1000);
triggerBlockReport();
// Stop executor from adding new tasks to finish existing threads in queue
latch.await();
assertThat(testFailed.get(), is(false));
}
@Test (timeout=300000) @Test (timeout=300000)
public void testDnRestartWithSavedReplicas() public void testDnRestartWithSavedReplicas()
throws IOException, InterruptedException { throws IOException, InterruptedException {
@ -384,11 +725,12 @@ public void testDnRestartWithUnsavedReplicas()
/** /**
* If ramDiskStorageLimit is >=0, then RAM_DISK capacity is artificially * If ramDiskStorageLimit is >=0, then RAM_DISK capacity is artificially
* capped. If tmpfsStorageLimit < 0 then it is ignored. * capped. If ramDiskStorageLimit < 0 then it is ignored.
*/ */
private void startUpCluster(final int numDataNodes, private void startUpCluster(final int numDataNodes,
final StorageType[] storageTypes, final StorageType[] storageTypes,
final long ramDiskStorageLimit) final long ramDiskStorageLimit,
final boolean useSCR)
throws IOException { throws IOException {
conf = new Configuration(); conf = new Configuration();
@ -401,7 +743,9 @@ private void startUpCluster(final int numDataNodes,
conf.setInt(DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC, conf.setInt(DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC,
LAZY_WRITER_INTERVAL_SEC); LAZY_WRITER_INTERVAL_SEC);
REPL_FACTOR = 1; //Reset if case a test has modified the value conf.setBoolean(DFS_CLIENT_READ_SHORTCIRCUIT_KEY,useSCR);
REPL_FACTOR = 1; //Reset in case a test has modified the value
cluster = new MiniDFSCluster cluster = new MiniDFSCluster
.Builder(conf) .Builder(conf)
@ -411,7 +755,7 @@ private void startUpCluster(final int numDataNodes,
fs = cluster.getFileSystem(); fs = cluster.getFileSystem();
client = fs.getClient(); client = fs.getClient();
// Artifically cap the storage capacity of the tmpfs volume. // Artificially cap the storage capacity of the RAM_DISK volume.
if (ramDiskStorageLimit >= 0) { if (ramDiskStorageLimit >= 0) {
List<? extends FsVolumeSpi> volumes = List<? extends FsVolumeSpi> volumes =
cluster.getDataNodes().get(0).getFSDataset().getVolumes(); cluster.getDataNodes().get(0).getFSDataset().getVolumes();
@ -426,6 +770,13 @@ private void startUpCluster(final int numDataNodes,
LOG.info("Cluster startup complete"); LOG.info("Cluster startup complete");
} }
private void startUpCluster(final int numDataNodes,
final StorageType[] storageTypes,
final long ramDiskStorageLimit)
throws IOException {
startUpCluster(numDataNodes, storageTypes, ramDiskStorageLimit, false);
}
private void makeTestFile(Path path, long length, final boolean isLazyPersist) private void makeTestFile(Path path, long length, final boolean isLazyPersist)
throws IOException { throws IOException {
@ -435,9 +786,7 @@ private void makeTestFile(Path path, long length, final boolean isLazyPersist)
createFlags.add(LAZY_PERSIST); createFlags.add(LAZY_PERSIST);
} }
FSDataOutputStream fos = null; FSDataOutputStream fos = null;
try { try {
fos = fos =
fs.create(path, fs.create(path,
@ -465,13 +814,14 @@ private void makeTestFile(Path path, long length, final boolean isLazyPersist)
private LocatedBlocks ensureFileReplicasOnStorageType( private LocatedBlocks ensureFileReplicasOnStorageType(
Path path, StorageType storageType) throws IOException { Path path, StorageType storageType) throws IOException {
// Ensure that returned block locations returned are correct! // Ensure that returned block locations returned are correct!
LOG.info("Ensure path: " + path + " is on StorageType: " + storageType);
assertThat(fs.exists(path), is(true));
long fileLength = client.getFileInfo(path.toString()).getLen(); long fileLength = client.getFileInfo(path.toString()).getLen();
LocatedBlocks locatedBlocks = LocatedBlocks locatedBlocks =
client.getLocatedBlocks(path.toString(), 0, fileLength); client.getLocatedBlocks(path.toString(), 0, fileLength);
for (LocatedBlock locatedBlock : locatedBlocks.getLocatedBlocks()) { for (LocatedBlock locatedBlock : locatedBlocks.getLocatedBlocks()) {
assertThat(locatedBlock.getStorageTypes()[0], is(storageType)); assertThat(locatedBlock.getStorageTypes()[0], is(storageType));
} }
return locatedBlocks; return locatedBlocks;
} }
@ -480,4 +830,119 @@ private void stopLazyWriter(DataNode dn) {
FsDatasetImpl fsDataset = ((FsDatasetImpl) dn.getFSDataset()); FsDatasetImpl fsDataset = ((FsDatasetImpl) dn.getFSDataset());
((FsDatasetImpl.LazyWriter) fsDataset.lazyWriter.getRunnable()).stop(); ((FsDatasetImpl.LazyWriter) fsDataset.lazyWriter.getRunnable()).stop();
} }
private void makeRandomTestFile(Path path, long length, final boolean isLazyPersist,
long seed) throws IOException {
DFSTestUtil.createFile(fs, path, isLazyPersist, BUFFER_LENGTH, length,
BLOCK_SIZE, REPL_FACTOR, seed, true);
}
private boolean verifyReadRandomFile(
Path path, int fileLength, int seed) throws IOException {
byte contents[] = DFSTestUtil.readFileBuffer(fs, path);
byte expected[] = DFSTestUtil.
calculateFileContentsFromSeed(seed, fileLength);
return Arrays.equals(contents, expected);
}
private boolean verifyDeletedBlocks(LocatedBlocks locatedBlocks)
throws IOException, InterruptedException {
LOG.info("Verifying replica has no saved copy after deletion.");
triggerBlockReport();
while(
DataNodeTestUtils.getPendingAsyncDeletions(cluster.getDataNodes().get(0))
> 0L){
Thread.sleep(1000);
}
final String bpid = cluster.getNamesystem().getBlockPoolId();
List<? extends FsVolumeSpi> volumes =
cluster.getDataNodes().get(0).getFSDataset().getVolumes();
// Make sure deleted replica does not have a copy on either finalized dir of
// transient volume or finalized dir of non-transient volume
for (FsVolumeSpi v : volumes) {
FsVolumeImpl volume = (FsVolumeImpl) v;
File targetDir = (v.isTransientStorage()) ?
volume.getBlockPoolSlice(bpid).getFinalizedDir() :
volume.getBlockPoolSlice(bpid).getLazypersistDir();
if (verifyBlockDeletedFromDir(targetDir, locatedBlocks) == false) {
return false;
}
}
return true;
}
private boolean verifyBlockDeletedFromDir(File dir, LocatedBlocks locatedBlocks) {
for (LocatedBlock lb : locatedBlocks.getLocatedBlocks()) {
File targetDir =
DatanodeUtil.idToBlockDir(dir, lb.getBlock().getBlockId());
File blockFile = new File(targetDir, lb.getBlock().getBlockName());
if (blockFile.exists()) {
LOG.warn("blockFile: " + blockFile.getAbsolutePath() +
" exists after deletion.");
return false;
}
File metaFile = new File(targetDir,
DatanodeUtil.getMetaName(lb.getBlock().getBlockName(),
lb.getBlock().getGenerationStamp()));
if (metaFile.exists()) {
LOG.warn("metaFile: " + metaFile.getAbsolutePath() +
" exists after deletion.");
return false;
}
}
return true;
}
private void triggerBlockReport()
throws IOException, InterruptedException {
// Trigger block report to NN
DataNodeTestUtils.triggerBlockReport(cluster.getDataNodes().get(0));
Thread.sleep(10 * 1000);
}
class WriterRunnable implements Runnable {
private final int id;
private final MiniDFSCluster cluster;
private final Path paths[];
private final int seed;
private CountDownLatch latch;
private AtomicBoolean bFail;
public WriterRunnable(MiniDFSCluster cluster, int threadIndex, Path[] paths,
int seed, CountDownLatch latch,
AtomicBoolean bFail) {
id = threadIndex;
this.cluster = cluster;
this.paths = paths;
this.seed = seed;
this.latch = latch;
this.bFail = bFail;
System.out.println("Creating Writer: " + id);
}
public void run() {
System.out.println("Writer " + id + " starting... ");
int i = 0;
try {
for (i = 0; i < paths.length; i++) {
makeRandomTestFile(paths[i], BLOCK_SIZE, true, seed);
// eviction may faiL when all blocks are not persisted yet.
// ensureFileReplicasOnStorageType(paths[i], RAM_DISK);
}
} catch (IOException e) {
bFail.set(true);
LOG.error("Writer exception: writer id:" + id +
" testfile: " + paths[i].toString() +
" " + e);
} finally {
latch.countDown();
}
}
}
} }