HDFS-6950. Add Additional unit tests for HDFS-6581. (Contributed by Xiaoyu Yao)

Conflicts:
	hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-6581.txt
	hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
This commit is contained in:
arp 2014-09-03 10:51:26 -07:00 committed by Jitendra Pandey
parent 754ba3a42d
commit 4609ba0dc8
2 changed files with 537 additions and 51 deletions

View File

@ -94,6 +94,11 @@ import java.util.*;
import java.util.concurrent.TimeoutException;
import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
import static org.apache.hadoop.fs.CreateFlag.CREATE;
import static org.apache.hadoop.fs.CreateFlag.LAZY_PERSIST;
import static org.apache.hadoop.fs.CreateFlag.OVERWRITE;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@ -305,16 +310,29 @@ public class DFSTestUtil {
public static void createFile(FileSystem fs, Path fileName, int bufferLen,
long fileLen, long blockSize, short replFactor, long seed)
throws IOException {
createFile(fs, fileName, false, bufferLen, fileLen, blockSize,
replFactor, seed, false);
}
public static void createFile(FileSystem fs, Path fileName,
boolean isLazyPersist, int bufferLen, long fileLen, long blockSize,
short replFactor, long seed, boolean flush) throws IOException {
assert bufferLen > 0;
if (!fs.mkdirs(fileName.getParent())) {
throw new IOException("Mkdirs failed to create " +
fileName.getParent().toString());
}
FSDataOutputStream out = null;
EnumSet<CreateFlag> createFlags = EnumSet.of(CREATE);
createFlags.add(OVERWRITE);
if (isLazyPersist) {
createFlags.add(LAZY_PERSIST);
}
try {
out = fs.create(fileName, true, fs.getConf()
.getInt(CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_KEY, 4096),
replFactor, blockSize);
out = fs.create(fileName, FsPermission.getFileDefault(), createFlags,
fs.getConf().getInt(CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_KEY, 4096),
replFactor, blockSize, null);
if (fileLen > 0) {
byte[] toWrite = new byte[bufferLen];
Random rb = new Random(seed);
@ -327,6 +345,9 @@ public class DFSTestUtil {
out.write(toWrite, 0, bytesToWriteNext);
bytesToWrite -= bytesToWriteNext;
}
if (flush) {
out.hsync();
}
}
} finally {
if (out != null) {

View File

@ -16,46 +16,49 @@
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
import static org.hamcrest.core.Is.is;
import static org.junit.Assert.*;
import java.io.File;
import java.io.IOException;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import com.google.common.util.concurrent.Uninterruptibles;
import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.StorageType;
import org.apache.hadoop.hdfs.protocol.*;
import org.apache.hadoop.hdfs.*;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.log4j.Level;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
import java.io.File;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.apache.hadoop.hdfs.StorageType.DEFAULT;
import static org.apache.hadoop.hdfs.StorageType.RAM_DISK;
import static org.apache.hadoop.fs.CreateFlag.CREATE;
import static org.apache.hadoop.fs.CreateFlag.LAZY_PERSIST;
import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
import org.apache.log4j.Level;
import org.junit.After;
import org.junit.Test;
import static org.apache.hadoop.hdfs.StorageType.DEFAULT;
import static org.apache.hadoop.hdfs.StorageType.RAM_DISK;
import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;
public class TestLazyPersistFiles {
public static final Log LOG = LogFactory.getLog(TestLazyPersistFiles.class);
@ -66,8 +69,10 @@ public class TestLazyPersistFiles {
((Log4JLogger) FsDatasetImpl.LOG).getLogger().setLevel(Level.ALL);
}
private static final int THREADPOOL_SIZE = 10;
private static short REPL_FACTOR = 1;
private static final long BLOCK_SIZE = 10485760; // 10 MB
private static final int BLOCK_SIZE = 10485760; // 10 MB
private static final int LAZY_WRITE_FILE_SCRUBBER_INTERVAL_SEC = 3;
private static final long HEARTBEAT_INTERVAL_SEC = 1;
private static final int HEARTBEAT_RECHECK_INTERVAL_MSEC = 500;
@ -161,6 +166,26 @@ public class TestLazyPersistFiles {
ensureFileReplicasOnStorageType(path, RAM_DISK);
}
@Test (timeout=300000)
public void testPlacementOnSizeLimitedRamDisk() throws IOException {
startUpCluster(REPL_FACTOR, new StorageType[] { DEFAULT, RAM_DISK },
3 * BLOCK_SIZE -1); // 2 replicas + delta
final String METHOD_NAME = GenericTestUtils.getMethodName();
Path path1 = new Path("/" + METHOD_NAME + ".01.dat");
Path path2 = new Path("/" + METHOD_NAME + ".02.dat");
makeTestFile(path1, BLOCK_SIZE, true);
makeTestFile(path2, BLOCK_SIZE, true);
ensureFileReplicasOnStorageType(path1, RAM_DISK);
ensureFileReplicasOnStorageType(path2, RAM_DISK);
}
/**
* Client tries to write LAZY_PERSIST to same DN with no RamDisk configured
* Write should default to disk. No error.
* @throws IOException
*/
@Test (timeout=300000)
public void testFallbackToDisk() throws IOException {
startUpCluster(REPL_FACTOR, null, -1);
@ -171,6 +196,59 @@ public class TestLazyPersistFiles {
ensureFileReplicasOnStorageType(path, DEFAULT);
}
/**
* File can not fit in RamDisk even with eviction
* @throws IOException
*/
@Test (timeout=300000)
public void testFallbackToDiskFull() throws IOException {
startUpCluster(REPL_FACTOR, null, BLOCK_SIZE - 1);
final String METHOD_NAME = GenericTestUtils.getMethodName();
Path path = new Path("/" + METHOD_NAME + ".dat");
makeTestFile(path, BLOCK_SIZE, true);
ensureFileReplicasOnStorageType(path, DEFAULT);
}
/**
* File partially fit in RamDisk after eviction.
* RamDisk can fit 2 blocks. Write a file with 5 blocks.
* Expect 2 blocks are on RamDisk whereas other 3 on disk.
* @throws IOException
*/
@Test (timeout=300000)
public void testFallbackToDiskPartial()
throws IOException, InterruptedException {
startUpCluster(REPL_FACTOR,
new StorageType[] { RAM_DISK, DEFAULT },
BLOCK_SIZE * 3 - 1);
final String METHOD_NAME = GenericTestUtils.getMethodName();
Path path = new Path("/" + METHOD_NAME + ".dat");
makeTestFile(path, BLOCK_SIZE * 5, true);
// Sleep for a short time to allow the lazy writer thread to do its job
Thread.sleep(6 * LAZY_WRITER_INTERVAL_SEC * 1000);
triggerBlockReport();
int numBlocksOnRamDisk = 0;
int numBlocksOnDisk = 0;
long fileLength = client.getFileInfo(path.toString()).getLen();
LocatedBlocks locatedBlocks =
client.getLocatedBlocks(path.toString(), 0, fileLength);
for (LocatedBlock locatedBlock : locatedBlocks.getLocatedBlocks()) {
if (locatedBlock.getStorageTypes()[0] == RAM_DISK) {
numBlocksOnRamDisk++;
}else if (locatedBlock.getStorageTypes()[0] == DEFAULT) {
numBlocksOnDisk++;
}
}
assertThat(numBlocksOnRamDisk, is(2));
assertThat(numBlocksOnDisk, is(3));
}
/**
* If the only available storage is RAM_DISK and the LAZY_PERSIST flag is not
* specified, then block placement should fail.
@ -191,6 +269,10 @@ public class TestLazyPersistFiles {
}
}
/**
* Append to lazy persist file is denied.
* @throws IOException
*/
@Test (timeout=300000)
public void testAppendIsDenied() throws IOException {
startUpCluster(REPL_FACTOR, new StorageType[] {RAM_DISK, DEFAULT }, -1);
@ -216,7 +298,7 @@ public class TestLazyPersistFiles {
public void testLazyPersistFilesAreDiscarded()
throws IOException, InterruptedException {
startUpCluster(REPL_FACTOR,
new StorageType[] {RAM_DISK, DEFAULT },
new StorageType[] { RAM_DISK, DEFAULT },
(2 * BLOCK_SIZE - 1)); // 1 replica + delta.
final String METHOD_NAME = GenericTestUtils.getMethodName();
Path path1 = new Path("/" + METHOD_NAME + ".01.dat");
@ -256,7 +338,7 @@ public class TestLazyPersistFiles {
@Test (timeout=300000)
public void testLazyPersistBlocksAreSaved()
throws IOException, InterruptedException {
startUpCluster(REPL_FACTOR, new StorageType[] {RAM_DISK, DEFAULT }, -1);
startUpCluster(REPL_FACTOR, new StorageType[] { RAM_DISK, DEFAULT }, -1);
final String METHOD_NAME = GenericTestUtils.getMethodName();
Path path = new Path("/" + METHOD_NAME + ".dat");
@ -302,7 +384,12 @@ public class TestLazyPersistFiles {
assertThat(persistedBlockIds.size(), is(locatedBlocks.getLocatedBlocks().size()));
}
/**
* RamDisk eviction after lazy persist to disk.
* Evicted blocks are still readable with on-disk replicas.
* @throws IOException
* @throws InterruptedException
*/
@Test (timeout=300000)
public void testRamDiskEviction()
throws IOException, InterruptedException {
@ -313,7 +400,8 @@ public class TestLazyPersistFiles {
Path path1 = new Path("/" + METHOD_NAME + ".01.dat");
Path path2 = new Path("/" + METHOD_NAME + ".02.dat");
makeTestFile(path1, BLOCK_SIZE, true);
final int SEED = 0xFADED;
makeRandomTestFile(path1, BLOCK_SIZE, true, SEED);
ensureFileReplicasOnStorageType(path1, RAM_DISK);
// Sleep for a short time to allow the lazy writer thread to do its job.
@ -323,15 +411,268 @@ public class TestLazyPersistFiles {
// Create another file with a replica on RAM_DISK.
makeTestFile(path2, BLOCK_SIZE, true);
DataNodeTestUtils.triggerBlockReport(cluster.getDataNodes().get(0));
Thread.sleep(10 * 1000);
triggerBlockReport();
// Make sure that the second file's block replica is on RAM_DISK, whereas
// the original file's block replica is now on disk.
ensureFileReplicasOnStorageType(path2, RAM_DISK);
// ensureFileReplicasOnStorageType(path2, RAM_DISK);
ensureFileReplicasOnStorageType(path1, DEFAULT);
}
/**
* RamDisk eviction should not happen on blocks that are not yet
* persisted on disk.
* @throws IOException
* @throws InterruptedException
*/
@Test (timeout=300000)
public void testRamDiskEvictionBeforePersist()
throws IOException, InterruptedException {
// 1 replica + delta, lazy persist interval every 50 minutes
startUpCluster(REPL_FACTOR, new StorageType[] { RAM_DISK, DEFAULT },
(2 * BLOCK_SIZE - 1));
final String METHOD_NAME = GenericTestUtils.getMethodName();
Path path1 = new Path("/" + METHOD_NAME + ".01.dat");
Path path2 = new Path("/" + METHOD_NAME + ".02.dat");
final int SEED = 0XFADED;
// Stop lazy writer to ensure block for path1 is not persisted to disk.
stopLazyWriter(cluster.getDataNodes().get(0));
makeRandomTestFile(path1, BLOCK_SIZE, true, SEED);
ensureFileReplicasOnStorageType(path1, RAM_DISK);
// Create second file with a replica on RAM_DISK.
makeTestFile(path2, BLOCK_SIZE, true);
// Eviction should not happen for block of the first file that is not
// persisted yet.
ensureFileReplicasOnStorageType(path1, RAM_DISK);
ensureFileReplicasOnStorageType(path2, DEFAULT);
assert(fs.exists(path1));
assert(fs.exists(path2));
verifyReadRandomFile(path1, BLOCK_SIZE, SEED);
}
/**
* Validates lazy persisted blocks are evicted from RAM_DISK based on LRU.
* @throws IOException
* @throws InterruptedException
*/
@Test (timeout=300000)
public void testRamDiskEvictionLRU()
throws IOException, InterruptedException {
startUpCluster(REPL_FACTOR, new StorageType[] { RAM_DISK, DEFAULT },
(4 * BLOCK_SIZE -1)); // 3 replica + delta.
final String METHOD_NAME = GenericTestUtils.getMethodName();
final int NUM_PATHS = 6;
Path paths[] = new Path[NUM_PATHS];
for (int i = 0; i < NUM_PATHS; i++) {
paths[i] = new Path("/" + METHOD_NAME + "." + i +".dat");
}
// No eviction for the first half of files
for (int i = 0; i < NUM_PATHS/2; i++) {
makeTestFile(paths[i], BLOCK_SIZE, true);
ensureFileReplicasOnStorageType(paths[i], RAM_DISK);
}
// Lazy persist writer persists the first half of files
Thread.sleep(3 * LAZY_WRITER_INTERVAL_SEC * 1000);
// Create the second half of files with eviction upon each create.
for (int i = NUM_PATHS/2; i < NUM_PATHS; i++) {
makeTestFile(paths[i], BLOCK_SIZE, true);
ensureFileReplicasOnStorageType(paths[i], RAM_DISK);
// path[i-NUM_PATHS/2] is expected to be evicted by LRU
triggerBlockReport();
ensureFileReplicasOnStorageType(paths[i - NUM_PATHS / 2], DEFAULT);
}
}
/**
* Delete lazy-persist file that has not been persisted to disk.
* Memory is freed up and file is gone.
* @throws IOException
*/
@Test (timeout=300000)
public void testDeleteBeforePersist()
throws IOException, InterruptedException {
startUpCluster(REPL_FACTOR, new StorageType[] { RAM_DISK, DEFAULT },
-1);
final String METHOD_NAME = GenericTestUtils.getMethodName();
stopLazyWriter(cluster.getDataNodes().get(0));
Path path = new Path("/" + METHOD_NAME + ".dat");
makeTestFile(path, BLOCK_SIZE, true);
LocatedBlocks locatedBlocks =
ensureFileReplicasOnStorageType(path, RAM_DISK);
// Delete before persist
client.delete(path.toString(), false);
Assert.assertFalse(fs.exists(path));
assertThat(verifyDeletedBlocks(locatedBlocks), is(true));
}
/**
* Delete lazy-persist file that has been persisted to disk
* Both memory blocks and disk blocks are deleted.
* @throws IOException
* @throws InterruptedException
*/
@Test (timeout=300000)
public void testDeleteAfterPersist()
throws IOException, InterruptedException {
startUpCluster(REPL_FACTOR, new StorageType[] { RAM_DISK, DEFAULT }, -1);
final String METHOD_NAME = GenericTestUtils.getMethodName();
Path path = new Path("/" + METHOD_NAME + ".dat");
makeTestFile(path, BLOCK_SIZE, true);
LocatedBlocks locatedBlocks = ensureFileReplicasOnStorageType(path, RAM_DISK);
// Sleep for a short time to allow the lazy writer thread to do its job
Thread.sleep(6 * LAZY_WRITER_INTERVAL_SEC * 1000);
// Delete after persist
client.delete(path.toString(), false);
Assert.assertFalse(fs.exists(path));
triggerBlockReport();
assertThat(verifyDeletedBlocks(locatedBlocks), is(true));
}
/**
* RAM_DISK used/free space
* @throws IOException
* @throws InterruptedException
*/
@Test (timeout=300000)
public void testDfsUsageCreateDelete()
throws IOException, InterruptedException {
startUpCluster(REPL_FACTOR, new StorageType[] { RAM_DISK, DEFAULT },
5 * BLOCK_SIZE - 1); // 4 replica + delta
final String METHOD_NAME = GenericTestUtils.getMethodName();
Path path = new Path("/" + METHOD_NAME + ".dat");
// Get the usage before write BLOCK_SIZE
long usedBeforeCreate = fs.getUsed();
makeTestFile(path, BLOCK_SIZE, true);
long usedAfterCreate = fs.getUsed();
assertThat(usedAfterCreate, is((long) BLOCK_SIZE));
// Sleep for a short time to allow the lazy writer thread to do its job
Thread.sleep(3 * LAZY_WRITER_INTERVAL_SEC * 1000);
long usedAfterPersist = fs.getUsed();
assertThat(usedAfterPersist, is((long) BLOCK_SIZE));
// Delete after persist
client.delete(path.toString(), false);
long usedAfterDelete = fs.getUsed();
assertThat(usedBeforeCreate, is(usedAfterDelete));
}
/**
* Concurrent read from the same node and verify the contents.
*/
@Test (timeout=300000)
public void testConcurrentRead()
throws Exception {
startUpCluster(REPL_FACTOR, new StorageType[] { DEFAULT, RAM_DISK },
3 * BLOCK_SIZE -1); // 2 replicas + delta
final String METHOD_NAME = GenericTestUtils.getMethodName();
final Path path1 = new Path("/" + METHOD_NAME + ".dat");
final int SEED = 0xFADED;
final int NUM_TASKS = 5;
makeRandomTestFile(path1, BLOCK_SIZE, true, SEED);
ensureFileReplicasOnStorageType(path1, RAM_DISK);
//Read from multiple clients
final CountDownLatch latch = new CountDownLatch(NUM_TASKS);
final AtomicBoolean testFailed = new AtomicBoolean(false);
Runnable readerRunnable = new Runnable() {
@Override
public void run() {
try {
Assert.assertTrue(verifyReadRandomFile(path1, BLOCK_SIZE, SEED));
} catch (Throwable e) {
LOG.error("readerRunnable error", e);
testFailed.set(true);
} finally {
latch.countDown();
}
}
};
Thread threads[] = new Thread[NUM_TASKS];
for (int i = 0; i < NUM_TASKS; i++) {
threads[i] = new Thread(readerRunnable);
threads[i].start();
}
Thread.sleep(500);
for (int i = 0; i < NUM_TASKS; i++) {
Uninterruptibles.joinUninterruptibly(threads[i]);
}
Assert.assertFalse(testFailed.get());
}
/**
* Concurrent write with eviction
* RAM_DISK can hold 9 replicas
* 4 threads each write 5 replicas
* @throws IOException
* @throws InterruptedException
*/
@Test (timeout=300000)
public void testConcurrentWrites()
throws IOException, InterruptedException {
startUpCluster(REPL_FACTOR, new StorageType[] { RAM_DISK, DEFAULT },
(10 * BLOCK_SIZE -1)); // 9 replica + delta.
final String METHOD_NAME = GenericTestUtils.getMethodName();
final int SEED = 0xFADED;
final int NUM_WRITERS = 4;
final int NUM_WRITER_PATHS = 5;
Path paths[][] = new Path[NUM_WRITERS][NUM_WRITER_PATHS];
for (int i = 0; i < NUM_WRITERS; i++) {
paths[i] = new Path[NUM_WRITER_PATHS];
for (int j = 0; j < NUM_WRITER_PATHS; j++) {
paths[i][j] =
new Path("/" + METHOD_NAME + ".Writer" + i + ".File." + j + ".dat");
}
}
final CountDownLatch latch = new CountDownLatch(NUM_WRITERS);
final AtomicBoolean testFailed = new AtomicBoolean(false);
ExecutorService executor = Executors.newFixedThreadPool(THREADPOOL_SIZE);
for (int i = 0; i < NUM_WRITERS; i++) {
Runnable writer = new WriterRunnable(cluster, i, paths[i], SEED, latch,
testFailed);
executor.execute(writer);
}
Thread.sleep(3 * LAZY_WRITER_INTERVAL_SEC * 1000);
triggerBlockReport();
// Stop executor from adding new tasks to finish existing threads in queue
latch.await();
assertThat(testFailed.get(), is(false));
}
@Test (timeout=300000)
public void testDnRestartWithSavedReplicas()
throws IOException, InterruptedException {
@ -384,11 +725,12 @@ public class TestLazyPersistFiles {
/**
* If ramDiskStorageLimit is >=0, then RAM_DISK capacity is artificially
* capped. If tmpfsStorageLimit < 0 then it is ignored.
* capped. If ramDiskStorageLimit < 0 then it is ignored.
*/
private void startUpCluster(final int numDataNodes,
final StorageType[] storageTypes,
final long ramDiskStorageLimit)
final long ramDiskStorageLimit,
final boolean useSCR)
throws IOException {
conf = new Configuration();
@ -401,7 +743,9 @@ public class TestLazyPersistFiles {
conf.setInt(DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC,
LAZY_WRITER_INTERVAL_SEC);
REPL_FACTOR = 1; //Reset if case a test has modified the value
conf.setBoolean(DFS_CLIENT_READ_SHORTCIRCUIT_KEY,useSCR);
REPL_FACTOR = 1; //Reset in case a test has modified the value
cluster = new MiniDFSCluster
.Builder(conf)
@ -411,7 +755,7 @@ public class TestLazyPersistFiles {
fs = cluster.getFileSystem();
client = fs.getClient();
// Artifically cap the storage capacity of the tmpfs volume.
// Artificially cap the storage capacity of the RAM_DISK volume.
if (ramDiskStorageLimit >= 0) {
List<? extends FsVolumeSpi> volumes =
cluster.getDataNodes().get(0).getFSDataset().getVolumes();
@ -426,6 +770,13 @@ public class TestLazyPersistFiles {
LOG.info("Cluster startup complete");
}
private void startUpCluster(final int numDataNodes,
final StorageType[] storageTypes,
final long ramDiskStorageLimit)
throws IOException {
startUpCluster(numDataNodes, storageTypes, ramDiskStorageLimit, false);
}
private void makeTestFile(Path path, long length, final boolean isLazyPersist)
throws IOException {
@ -435,9 +786,7 @@ public class TestLazyPersistFiles {
createFlags.add(LAZY_PERSIST);
}
FSDataOutputStream fos = null;
try {
fos =
fs.create(path,
@ -465,13 +814,14 @@ public class TestLazyPersistFiles {
private LocatedBlocks ensureFileReplicasOnStorageType(
Path path, StorageType storageType) throws IOException {
// Ensure that returned block locations returned are correct!
LOG.info("Ensure path: " + path + " is on StorageType: " + storageType);
assertThat(fs.exists(path), is(true));
long fileLength = client.getFileInfo(path.toString()).getLen();
LocatedBlocks locatedBlocks =
client.getLocatedBlocks(path.toString(), 0, fileLength);
for (LocatedBlock locatedBlock : locatedBlocks.getLocatedBlocks()) {
assertThat(locatedBlock.getStorageTypes()[0], is(storageType));
}
return locatedBlocks;
}
@ -480,4 +830,119 @@ public class TestLazyPersistFiles {
FsDatasetImpl fsDataset = ((FsDatasetImpl) dn.getFSDataset());
((FsDatasetImpl.LazyWriter) fsDataset.lazyWriter.getRunnable()).stop();
}
private void makeRandomTestFile(Path path, long length, final boolean isLazyPersist,
long seed) throws IOException {
DFSTestUtil.createFile(fs, path, isLazyPersist, BUFFER_LENGTH, length,
BLOCK_SIZE, REPL_FACTOR, seed, true);
}
private boolean verifyReadRandomFile(
Path path, int fileLength, int seed) throws IOException {
byte contents[] = DFSTestUtil.readFileBuffer(fs, path);
byte expected[] = DFSTestUtil.
calculateFileContentsFromSeed(seed, fileLength);
return Arrays.equals(contents, expected);
}
private boolean verifyDeletedBlocks(LocatedBlocks locatedBlocks)
throws IOException, InterruptedException {
LOG.info("Verifying replica has no saved copy after deletion.");
triggerBlockReport();
while(
DataNodeTestUtils.getPendingAsyncDeletions(cluster.getDataNodes().get(0))
> 0L){
Thread.sleep(1000);
}
final String bpid = cluster.getNamesystem().getBlockPoolId();
List<? extends FsVolumeSpi> volumes =
cluster.getDataNodes().get(0).getFSDataset().getVolumes();
// Make sure deleted replica does not have a copy on either finalized dir of
// transient volume or finalized dir of non-transient volume
for (FsVolumeSpi v : volumes) {
FsVolumeImpl volume = (FsVolumeImpl) v;
File targetDir = (v.isTransientStorage()) ?
volume.getBlockPoolSlice(bpid).getFinalizedDir() :
volume.getBlockPoolSlice(bpid).getLazypersistDir();
if (verifyBlockDeletedFromDir(targetDir, locatedBlocks) == false) {
return false;
}
}
return true;
}
private boolean verifyBlockDeletedFromDir(File dir, LocatedBlocks locatedBlocks) {
for (LocatedBlock lb : locatedBlocks.getLocatedBlocks()) {
File targetDir =
DatanodeUtil.idToBlockDir(dir, lb.getBlock().getBlockId());
File blockFile = new File(targetDir, lb.getBlock().getBlockName());
if (blockFile.exists()) {
LOG.warn("blockFile: " + blockFile.getAbsolutePath() +
" exists after deletion.");
return false;
}
File metaFile = new File(targetDir,
DatanodeUtil.getMetaName(lb.getBlock().getBlockName(),
lb.getBlock().getGenerationStamp()));
if (metaFile.exists()) {
LOG.warn("metaFile: " + metaFile.getAbsolutePath() +
" exists after deletion.");
return false;
}
}
return true;
}
private void triggerBlockReport()
throws IOException, InterruptedException {
// Trigger block report to NN
DataNodeTestUtils.triggerBlockReport(cluster.getDataNodes().get(0));
Thread.sleep(10 * 1000);
}
class WriterRunnable implements Runnable {
private final int id;
private final MiniDFSCluster cluster;
private final Path paths[];
private final int seed;
private CountDownLatch latch;
private AtomicBoolean bFail;
public WriterRunnable(MiniDFSCluster cluster, int threadIndex, Path[] paths,
int seed, CountDownLatch latch,
AtomicBoolean bFail) {
id = threadIndex;
this.cluster = cluster;
this.paths = paths;
this.seed = seed;
this.latch = latch;
this.bFail = bFail;
System.out.println("Creating Writer: " + id);
}
public void run() {
System.out.println("Writer " + id + " starting... ");
int i = 0;
try {
for (i = 0; i < paths.length; i++) {
makeRandomTestFile(paths[i], BLOCK_SIZE, true, seed);
// eviction may faiL when all blocks are not persisted yet.
// ensureFileReplicasOnStorageType(paths[i], RAM_DISK);
}
} catch (IOException e) {
bFail.set(true);
LOG.error("Writer exception: writer id:" + id +
" testfile: " + paths[i].toString() +
" " + e);
} finally {
latch.countDown();
}
}
}
}