HDFS-7475. Make TestLazyPersistFiles#testLazyPersistBlocksAreSaved deterministic. (Contributed by Xiaoyu Yao)
This commit is contained in:
parent
87b3fc8cae
commit
d39809016d
|
@ -310,6 +310,9 @@ Release 2.7.0 - UNRELEASED
|
||||||
HDFS-5578. [JDK8] Fix Javadoc errors caused by incorrect or illegal tags
|
HDFS-5578. [JDK8] Fix Javadoc errors caused by incorrect or illegal tags
|
||||||
in doc comments. (Andrew Purtell via wheat9)
|
in doc comments. (Andrew Purtell via wheat9)
|
||||||
|
|
||||||
|
HDFS-7475. Make TestLazyPersistFiles#testLazyPersistBlocksAreSaved
|
||||||
|
deterministic. (Xiaoyu Yao via Arpit Agarwal)
|
||||||
|
|
||||||
Release 2.6.1 - UNRELEASED
|
Release 2.6.1 - UNRELEASED
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -50,6 +50,8 @@ import java.io.IOException;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.EnumSet;
|
import java.util.EnumSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.Set;
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
|
|
||||||
import static org.apache.hadoop.fs.CreateFlag.CREATE;
|
import static org.apache.hadoop.fs.CreateFlag.CREATE;
|
||||||
|
@ -131,6 +133,48 @@ public abstract class LazyPersistTestCase {
|
||||||
return locatedBlocks;
|
return locatedBlocks;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Make sure at least one non-transient volume has a saved copy of the replica.
|
||||||
|
* An infinite loop is used to ensure the async lazy persist tasks are completely
|
||||||
|
* done before verification. Caller of ensureLazyPersistBlocksAreSaved expects
|
||||||
|
* either a successful pass or timeout failure.
|
||||||
|
*/
|
||||||
|
protected final void ensureLazyPersistBlocksAreSaved(
|
||||||
|
LocatedBlocks locatedBlocks) throws IOException, InterruptedException {
|
||||||
|
final String bpid = cluster.getNamesystem().getBlockPoolId();
|
||||||
|
List<? extends FsVolumeSpi> volumes =
|
||||||
|
cluster.getDataNodes().get(0).getFSDataset().getVolumes();
|
||||||
|
final Set<Long> persistedBlockIds = new HashSet<Long>();
|
||||||
|
|
||||||
|
while (persistedBlockIds.size() < locatedBlocks.getLocatedBlocks().size()) {
|
||||||
|
// Take 1 second sleep before each verification iteration
|
||||||
|
Thread.sleep(1000);
|
||||||
|
|
||||||
|
for (LocatedBlock lb : locatedBlocks.getLocatedBlocks()) {
|
||||||
|
for (FsVolumeSpi v : volumes) {
|
||||||
|
if (v.isTransientStorage()) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
FsVolumeImpl volume = (FsVolumeImpl) v;
|
||||||
|
File lazyPersistDir = volume.getBlockPoolSlice(bpid).getLazypersistDir();
|
||||||
|
|
||||||
|
long blockId = lb.getBlock().getBlockId();
|
||||||
|
File targetDir =
|
||||||
|
DatanodeUtil.idToBlockDir(lazyPersistDir, blockId);
|
||||||
|
File blockFile = new File(targetDir, lb.getBlock().getBlockName());
|
||||||
|
if (blockFile.exists()) {
|
||||||
|
// Found a persisted copy for this block and added to the Set
|
||||||
|
persistedBlockIds.add(blockId);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// We should have found a persisted copy for each located block.
|
||||||
|
assertThat(persistedBlockIds.size(), is(locatedBlocks.getLocatedBlocks().size()));
|
||||||
|
}
|
||||||
|
|
||||||
protected final void makeRandomTestFile(Path path, long length,
|
protected final void makeRandomTestFile(Path path, long length,
|
||||||
boolean isLazyPersist, long seed) throws IOException {
|
boolean isLazyPersist, long seed) throws IOException {
|
||||||
DFSTestUtil.createFile(fs, path, isLazyPersist, BUFFER_LENGTH, length,
|
DFSTestUtil.createFile(fs, path, isLazyPersist, BUFFER_LENGTH, length,
|
||||||
|
|
|
@ -304,37 +304,7 @@ public class TestLazyPersistFiles extends LazyPersistTestCase {
|
||||||
|
|
||||||
// Make sure that there is a saved copy of the replica on persistent
|
// Make sure that there is a saved copy of the replica on persistent
|
||||||
// storage.
|
// storage.
|
||||||
final String bpid = cluster.getNamesystem().getBlockPoolId();
|
ensureLazyPersistBlocksAreSaved(locatedBlocks);
|
||||||
List<? extends FsVolumeSpi> volumes =
|
|
||||||
cluster.getDataNodes().get(0).getFSDataset().getVolumes();
|
|
||||||
|
|
||||||
final Set<Long> persistedBlockIds = new HashSet<Long>();
|
|
||||||
|
|
||||||
// Make sure at least one non-transient volume has a saved copy of
|
|
||||||
// the replica.
|
|
||||||
for (FsVolumeSpi v : volumes) {
|
|
||||||
if (v.isTransientStorage()) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
FsVolumeImpl volume = (FsVolumeImpl) v;
|
|
||||||
File lazyPersistDir = volume.getBlockPoolSlice(bpid).getLazypersistDir();
|
|
||||||
|
|
||||||
for (LocatedBlock lb : locatedBlocks.getLocatedBlocks()) {
|
|
||||||
File targetDir = DatanodeUtil.idToBlockDir(lazyPersistDir, lb.getBlock().getBlockId());
|
|
||||||
File blockFile = new File(targetDir, lb.getBlock().getBlockName());
|
|
||||||
if (blockFile.exists()) {
|
|
||||||
// Found a persisted copy for this block!
|
|
||||||
boolean added = persistedBlockIds.add(lb.getBlock().getBlockId());
|
|
||||||
assertThat(added, is(true));
|
|
||||||
} else {
|
|
||||||
LOG.error(blockFile + " not found");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// We should have found a persisted copy for each located block.
|
|
||||||
assertThat(persistedBlockIds.size(), is(locatedBlocks.getLocatedBlocks().size()));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
Loading…
Reference in New Issue