HDFS-6931. Move lazily persisted replicas to finalized directory on DN startup. (Arpit Agarwal)
Conflicts: hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-6581.txt hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
This commit is contained in:
parent
d973656e5d
commit
754ba3a42d
|
@ -94,17 +94,6 @@ class BlockPoolSlice {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Delete all checkpointed replicas on startup.
|
|
||||||
// TODO: We can move checkpointed replicas to the finalized dir and delete
|
|
||||||
// the copy on RAM_DISK. For now we take the simpler approach.
|
|
||||||
|
|
||||||
FileUtil.fullyDelete(lazypersistDir);
|
|
||||||
if (!this.lazypersistDir.exists()) {
|
|
||||||
if (!this.lazypersistDir.mkdirs()) {
|
|
||||||
throw new IOException("Failed to mkdirs " + this.lazypersistDir);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Files that were being written when the datanode was last shutdown
|
// Files that were being written when the datanode was last shutdown
|
||||||
// are now moved back to the data directory. It is possible that
|
// are now moved back to the data directory. It is possible that
|
||||||
// in the future, we might want to do some sort of datanode-local
|
// in the future, we might want to do some sort of datanode-local
|
||||||
|
@ -277,6 +266,13 @@ class BlockPoolSlice {
|
||||||
return blockFile;
|
return blockFile;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Save the given replica to persistent storage.
|
||||||
|
*
|
||||||
|
* @param replicaInfo
|
||||||
|
* @return The saved block file.
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
File lazyPersistReplica(ReplicaInfo replicaInfo) throws IOException {
|
File lazyPersistReplica(ReplicaInfo replicaInfo) throws IOException {
|
||||||
if (!lazypersistDir.exists() && !lazypersistDir.mkdirs()) {
|
if (!lazypersistDir.exists() && !lazypersistDir.mkdirs()) {
|
||||||
FsDatasetImpl.LOG.warn("Failed to create " + lazypersistDir);
|
FsDatasetImpl.LOG.warn("Failed to create " + lazypersistDir);
|
||||||
|
@ -311,11 +307,21 @@ class BlockPoolSlice {
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
void getVolumeMap(ReplicaMap volumeMap) throws IOException {
|
void getVolumeMap(ReplicaMap volumeMap,
|
||||||
|
final LazyWriteReplicaTracker lazyWriteReplicaMap)
|
||||||
|
throws IOException {
|
||||||
|
// Recover lazy persist replicas, they will be added to the volumeMap
|
||||||
|
// when we scan the finalized directory.
|
||||||
|
if (lazypersistDir.exists()) {
|
||||||
|
int numRecovered = moveLazyPersistReplicasToFinalized(lazypersistDir);
|
||||||
|
FsDatasetImpl.LOG.info(
|
||||||
|
"Recovered " + numRecovered + " replicas from " + lazypersistDir);
|
||||||
|
}
|
||||||
|
|
||||||
// add finalized replicas
|
// add finalized replicas
|
||||||
addToReplicasMap(volumeMap, finalizedDir, true);
|
addToReplicasMap(volumeMap, finalizedDir, lazyWriteReplicaMap, true);
|
||||||
// add rbw replicas
|
// add rbw replicas
|
||||||
addToReplicasMap(volumeMap, rbwDir, false);
|
addToReplicasMap(volumeMap, rbwDir, lazyWriteReplicaMap, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -343,19 +349,69 @@ class BlockPoolSlice {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Move replicas in the lazy persist directory to their corresponding locations
|
||||||
|
* in the finalized directory.
|
||||||
|
* @return number of replicas recovered.
|
||||||
|
*/
|
||||||
|
private int moveLazyPersistReplicasToFinalized(File source)
|
||||||
|
throws IOException {
|
||||||
|
File files[] = FileUtil.listFiles(source);
|
||||||
|
int numRecovered = 0;
|
||||||
|
for (File file : files) {
|
||||||
|
if (file.isDirectory()) {
|
||||||
|
numRecovered += moveLazyPersistReplicasToFinalized(file);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (Block.isMetaFilename(file.getName())) {
|
||||||
|
File metaFile = file;
|
||||||
|
File blockFile = Block.metaToBlockFile(metaFile);
|
||||||
|
long blockId = Block.filename2id(blockFile.getName());
|
||||||
|
File targetDir = DatanodeUtil.idToBlockDir(finalizedDir, blockId);
|
||||||
|
|
||||||
|
if (blockFile.exists()) {
|
||||||
|
File targetBlockFile = new File(targetDir, blockFile.getName());
|
||||||
|
File targetMetaFile = new File(targetDir, metaFile.getName());
|
||||||
|
|
||||||
|
if (!targetDir.exists() && !targetDir.mkdirs()) {
|
||||||
|
FsDatasetImpl.LOG.warn("Failed to move " + blockFile + " to " + targetDir);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
metaFile.renameTo(targetMetaFile);
|
||||||
|
blockFile.renameTo(targetBlockFile);
|
||||||
|
|
||||||
|
if (targetBlockFile.exists() && targetMetaFile.exists()) {
|
||||||
|
++numRecovered;
|
||||||
|
} else {
|
||||||
|
// Failure should be rare.
|
||||||
|
FsDatasetImpl.LOG.warn("Failed to move " + blockFile + " to " + targetDir);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
FileUtil.fullyDelete(source);
|
||||||
|
return numRecovered;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Add replicas under the given directory to the volume map
|
* Add replicas under the given directory to the volume map
|
||||||
* @param volumeMap the replicas map
|
* @param volumeMap the replicas map
|
||||||
* @param dir an input directory
|
* @param dir an input directory
|
||||||
|
* @param lazyWriteReplicaMap Map of replicas on transient
|
||||||
|
* storage.
|
||||||
* @param isFinalized true if the directory has finalized replicas;
|
* @param isFinalized true if the directory has finalized replicas;
|
||||||
* false if the directory has rbw replicas
|
* false if the directory has rbw replicas
|
||||||
*/
|
*/
|
||||||
void addToReplicasMap(ReplicaMap volumeMap, File dir, boolean isFinalized
|
void addToReplicasMap(ReplicaMap volumeMap, File dir,
|
||||||
) throws IOException {
|
final LazyWriteReplicaTracker lazyWriteReplicaMap,
|
||||||
|
boolean isFinalized)
|
||||||
|
throws IOException {
|
||||||
File files[] = FileUtil.listFiles(dir);
|
File files[] = FileUtil.listFiles(dir);
|
||||||
for (File file : files) {
|
for (File file : files) {
|
||||||
if (file.isDirectory()) {
|
if (file.isDirectory()) {
|
||||||
addToReplicasMap(volumeMap, file, isFinalized);
|
addToReplicasMap(volumeMap, file, lazyWriteReplicaMap, isFinalized);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (isFinalized && FsDatasetUtil.isUnlinkTmpFile(file)) {
|
if (isFinalized && FsDatasetUtil.isUnlinkTmpFile(file)) {
|
||||||
|
@ -413,13 +469,83 @@ class BlockPoolSlice {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
ReplicaInfo oldReplica = volumeMap.add(bpid, newReplica);
|
ReplicaInfo oldReplica = volumeMap.get(bpid, newReplica.getBlockId());
|
||||||
if (oldReplica != null) {
|
if (oldReplica == null) {
|
||||||
FsDatasetImpl.LOG.warn("Two block files with the same block id exist " +
|
volumeMap.add(bpid, newReplica);
|
||||||
"on disk: " + oldReplica.getBlockFile() + " and " + file );
|
} else {
|
||||||
|
// We have multiple replicas of the same block so decide which one
|
||||||
|
// to keep.
|
||||||
|
newReplica = resolveDuplicateReplicas(newReplica, oldReplica, volumeMap);
|
||||||
|
}
|
||||||
|
|
||||||
|
// If we are retaining a replica on transient storage make sure
|
||||||
|
// it is in the lazyWriteReplicaMap so it can be persisted
|
||||||
|
// eventually.
|
||||||
|
if (newReplica.getVolume().isTransientStorage()) {
|
||||||
|
lazyWriteReplicaMap.addReplica(bpid, blockId, newReplica.getVolume());
|
||||||
|
} else {
|
||||||
|
lazyWriteReplicaMap.discardReplica(bpid, blockId, true);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This method is invoked during DN startup when volumes are scanned to
|
||||||
|
* build up the volumeMap.
|
||||||
|
*
|
||||||
|
* Given two replicas, decide which one to keep. The preference is as
|
||||||
|
* follows:
|
||||||
|
* 1. Prefer the replica with the higher generation stamp.
|
||||||
|
* 2. If generation stamps are equal, prefer the replica with the
|
||||||
|
* larger on-disk length.
|
||||||
|
* 3. If on-disk length is the same, prefer the replica on persistent
|
||||||
|
* storage volume.
|
||||||
|
* 4. All other factors being equal, keep replica1.
|
||||||
|
*
|
||||||
|
* The other replica is removed from the volumeMap and is deleted from
|
||||||
|
* its storage volume.
|
||||||
|
*
|
||||||
|
* @param replica1
|
||||||
|
* @param replica2
|
||||||
|
* @param volumeMap
|
||||||
|
* @return the replica that is retained.
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
private ReplicaInfo resolveDuplicateReplicas(
|
||||||
|
final ReplicaInfo replica1, final ReplicaInfo replica2,
|
||||||
|
final ReplicaMap volumeMap) throws IOException {
|
||||||
|
|
||||||
|
ReplicaInfo replicaToKeep;
|
||||||
|
ReplicaInfo replicaToDelete;
|
||||||
|
|
||||||
|
if (replica1.getGenerationStamp() != replica2.getGenerationStamp()) {
|
||||||
|
replicaToKeep = replica1.getGenerationStamp() > replica2.getGenerationStamp()
|
||||||
|
? replica1 : replica2;
|
||||||
|
} else if (replica1.getNumBytes() != replica2.getNumBytes()) {
|
||||||
|
replicaToKeep = replica1.getNumBytes() > replica2.getNumBytes() ?
|
||||||
|
replica1 : replica2;
|
||||||
|
} else if (replica1.getVolume().isTransientStorage() &&
|
||||||
|
!replica2.getVolume().isTransientStorage()) {
|
||||||
|
replicaToKeep = replica2;
|
||||||
|
} else {
|
||||||
|
replicaToKeep = replica1;
|
||||||
|
}
|
||||||
|
|
||||||
|
replicaToDelete = (replicaToKeep == replica1) ? replica2 : replica1;
|
||||||
|
|
||||||
|
// Update volumeMap.
|
||||||
|
volumeMap.add(bpid, replicaToKeep);
|
||||||
|
|
||||||
|
// Delete the files on disk. Failure here is okay.
|
||||||
|
replicaToDelete.getBlockFile().delete();
|
||||||
|
replicaToDelete.getMetaFile().delete();
|
||||||
|
|
||||||
|
FsDatasetImpl.LOG.info(
|
||||||
|
"resolveDuplicateReplicas keeping " + replicaToKeep.getBlockFile() +
|
||||||
|
", deleting " + replicaToDelete.getBlockFile());
|
||||||
|
|
||||||
|
return replicaToKeep;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Find out the number of bytes in the block that match its crc.
|
* Find out the number of bytes in the block that match its crc.
|
||||||
|
|
|
@ -275,13 +275,13 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
||||||
volumes = new FsVolumeList(volsFailed, blockChooserImpl);
|
volumes = new FsVolumeList(volsFailed, blockChooserImpl);
|
||||||
asyncDiskService = new FsDatasetAsyncDiskService(datanode);
|
asyncDiskService = new FsDatasetAsyncDiskService(datanode);
|
||||||
|
|
||||||
// TODO: Initialize transientReplicaTracker from blocks on disk.
|
|
||||||
|
|
||||||
for (int idx = 0; idx < storage.getNumStorageDirs(); idx++) {
|
for (int idx = 0; idx < storage.getNumStorageDirs(); idx++) {
|
||||||
addVolume(dataLocations, storage.getStorageDir(idx));
|
addVolume(dataLocations, storage.getStorageDir(idx));
|
||||||
}
|
}
|
||||||
|
|
||||||
cacheManager = new FsDatasetCache(this);
|
cacheManager = new FsDatasetCache(this);
|
||||||
|
|
||||||
|
// Start the lazy writer once we have built the replica maps.
|
||||||
lazyWriter = new Daemon(new LazyWriter(
|
lazyWriter = new Daemon(new LazyWriter(
|
||||||
conf.getInt(DFSConfigKeys.DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC,
|
conf.getInt(DFSConfigKeys.DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC,
|
||||||
DFSConfigKeys.DFS_DATANODE_LAZY_WRITER_INTERVAL_DEFAULT_SEC)));
|
DFSConfigKeys.DFS_DATANODE_LAZY_WRITER_INTERVAL_DEFAULT_SEC)));
|
||||||
|
@ -301,7 +301,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
||||||
FsVolumeImpl fsVolume = FsVolumeImplAllocator.createVolume(
|
FsVolumeImpl fsVolume = FsVolumeImplAllocator.createVolume(
|
||||||
this, sd.getStorageUuid(), dir, this.conf, storageType);
|
this, sd.getStorageUuid(), dir, this.conf, storageType);
|
||||||
ReplicaMap tempVolumeMap = new ReplicaMap(this);
|
ReplicaMap tempVolumeMap = new ReplicaMap(this);
|
||||||
fsVolume.getVolumeMap(tempVolumeMap);
|
fsVolume.getVolumeMap(volumeMap, lazyWriteReplicaTracker);
|
||||||
|
|
||||||
volumeMap.addAll(tempVolumeMap);
|
volumeMap.addAll(tempVolumeMap);
|
||||||
volumes.addVolume(fsVolume);
|
volumes.addVolume(fsVolume);
|
||||||
|
@ -2194,7 +2194,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
||||||
volumes.addBlockPool(bpid, conf);
|
volumes.addBlockPool(bpid, conf);
|
||||||
volumeMap.initBlockPool(bpid);
|
volumeMap.initBlockPool(bpid);
|
||||||
}
|
}
|
||||||
volumes.getAllVolumesMap(bpid, volumeMap);
|
volumes.getAllVolumesMap(bpid, volumeMap, lazyWriteReplicaTracker);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -2444,6 +2444,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
||||||
LOG.warn("Exception saving replica " + replicaState, ioe);
|
LOG.warn("Exception saving replica " + replicaState, ioe);
|
||||||
} finally {
|
} finally {
|
||||||
if (!succeeded && replicaState != null) {
|
if (!succeeded && replicaState != null) {
|
||||||
|
LOG.warn("Failed to save replica " + replicaState + ". re-enqueueing it.");
|
||||||
lazyWriteReplicaTracker.reenqueueReplica(replicaState);
|
lazyWriteReplicaTracker.reenqueueReplica(replicaState);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -297,14 +297,18 @@ public class FsVolumeImpl implements FsVolumeSpi {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void getVolumeMap(ReplicaMap volumeMap) throws IOException {
|
void getVolumeMap(ReplicaMap volumeMap,
|
||||||
|
final LazyWriteReplicaTracker lazyWriteReplicaMap)
|
||||||
|
throws IOException {
|
||||||
for(BlockPoolSlice s : bpSlices.values()) {
|
for(BlockPoolSlice s : bpSlices.values()) {
|
||||||
s.getVolumeMap(volumeMap);
|
s.getVolumeMap(volumeMap, lazyWriteReplicaMap);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void getVolumeMap(String bpid, ReplicaMap volumeMap) throws IOException {
|
void getVolumeMap(String bpid, ReplicaMap volumeMap,
|
||||||
getBlockPoolSlice(bpid).getVolumeMap(volumeMap);
|
final LazyWriteReplicaTracker lazyWriteReplicaMap)
|
||||||
|
throws IOException {
|
||||||
|
getBlockPoolSlice(bpid).getVolumeMap(volumeMap, lazyWriteReplicaMap);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -119,7 +119,10 @@ class FsVolumeList {
|
||||||
return remaining;
|
return remaining;
|
||||||
}
|
}
|
||||||
|
|
||||||
void getAllVolumesMap(final String bpid, final ReplicaMap volumeMap) throws IOException {
|
void getAllVolumesMap(final String bpid,
|
||||||
|
final ReplicaMap volumeMap,
|
||||||
|
final LazyWriteReplicaTracker lazyWriteReplicaMap)
|
||||||
|
throws IOException {
|
||||||
long totalStartTime = Time.monotonicNow();
|
long totalStartTime = Time.monotonicNow();
|
||||||
final List<IOException> exceptions = Collections.synchronizedList(
|
final List<IOException> exceptions = Collections.synchronizedList(
|
||||||
new ArrayList<IOException>());
|
new ArrayList<IOException>());
|
||||||
|
@ -131,7 +134,7 @@ class FsVolumeList {
|
||||||
FsDatasetImpl.LOG.info("Adding replicas to map for block pool " +
|
FsDatasetImpl.LOG.info("Adding replicas to map for block pool " +
|
||||||
bpid + " on volume " + v + "...");
|
bpid + " on volume " + v + "...");
|
||||||
long startTime = Time.monotonicNow();
|
long startTime = Time.monotonicNow();
|
||||||
v.getVolumeMap(bpid, volumeMap);
|
v.getVolumeMap(bpid, volumeMap, lazyWriteReplicaMap);
|
||||||
long timeTaken = Time.monotonicNow() - startTime;
|
long timeTaken = Time.monotonicNow() - startTime;
|
||||||
FsDatasetImpl.LOG.info("Time to add replicas to map for block pool"
|
FsDatasetImpl.LOG.info("Time to add replicas to map for block pool"
|
||||||
+ " " + bpid + " on volume " + v + ": " + timeTaken + "ms");
|
+ " " + bpid + " on volume " + v + ": " + timeTaken + "ms");
|
||||||
|
@ -160,17 +163,6 @@ class FsVolumeList {
|
||||||
+ totalTimeTaken + "ms");
|
+ totalTimeTaken + "ms");
|
||||||
}
|
}
|
||||||
|
|
||||||
void getVolumeMap(String bpid, FsVolumeImpl volume, ReplicaMap volumeMap)
|
|
||||||
throws IOException {
|
|
||||||
FsDatasetImpl.LOG.info("Adding replicas to map for block pool " + bpid +
|
|
||||||
" on volume " + volume + "...");
|
|
||||||
long startTime = Time.monotonicNow();
|
|
||||||
volume.getVolumeMap(bpid, volumeMap);
|
|
||||||
long timeTaken = Time.monotonicNow() - startTime;
|
|
||||||
FsDatasetImpl.LOG.info("Time to add replicas to map for block pool " + bpid +
|
|
||||||
" on volume " + volume + ": " + timeTaken + "ms");
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Calls {@link FsVolumeImpl#checkDirs()} on each volume, removing any
|
* Calls {@link FsVolumeImpl#checkDirs()} on each volume, removing any
|
||||||
* volumes from the active list that result in a DiskErrorException.
|
* volumes from the active list that result in a DiskErrorException.
|
||||||
|
|
|
@ -265,7 +265,9 @@ public class TestLazyPersistFiles {
|
||||||
LocatedBlocks locatedBlocks = ensureFileReplicasOnStorageType(path, RAM_DISK);
|
LocatedBlocks locatedBlocks = ensureFileReplicasOnStorageType(path, RAM_DISK);
|
||||||
|
|
||||||
// Sleep for a short time to allow the lazy writer thread to do its job
|
// Sleep for a short time to allow the lazy writer thread to do its job
|
||||||
Thread.sleep(3 * LAZY_WRITER_INTERVAL_SEC * 1000);
|
Thread.sleep(6 * LAZY_WRITER_INTERVAL_SEC * 1000);
|
||||||
|
|
||||||
|
LOG.info("Verifying copy was saved to lazyPersist/");
|
||||||
|
|
||||||
// Make sure that there is a saved copy of the replica on persistent
|
// Make sure that there is a saved copy of the replica on persistent
|
||||||
// storage.
|
// storage.
|
||||||
|
@ -330,23 +332,52 @@ public class TestLazyPersistFiles {
|
||||||
ensureFileReplicasOnStorageType(path1, DEFAULT);
|
ensureFileReplicasOnStorageType(path1, DEFAULT);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* TODO: Stub test, to be completed.
|
|
||||||
* Verify that checksum computation is skipped for files written to memory.
|
|
||||||
*/
|
|
||||||
@Test (timeout=300000)
|
@Test (timeout=300000)
|
||||||
public void testChecksumIsSkipped()
|
public void testDnRestartWithSavedReplicas()
|
||||||
throws IOException, InterruptedException {
|
throws IOException, InterruptedException {
|
||||||
|
|
||||||
startUpCluster(REPL_FACTOR,
|
startUpCluster(REPL_FACTOR,
|
||||||
new StorageType[] {RAM_DISK, DEFAULT }, -1);
|
new StorageType[] {RAM_DISK, DEFAULT },
|
||||||
|
(2 * BLOCK_SIZE - 1)); // 1 replica + delta.
|
||||||
final String METHOD_NAME = GenericTestUtils.getMethodName();
|
final String METHOD_NAME = GenericTestUtils.getMethodName();
|
||||||
Path path1 = new Path("/" + METHOD_NAME + ".01.dat");
|
Path path1 = new Path("/" + METHOD_NAME + ".01.dat");
|
||||||
|
|
||||||
makeTestFile(path1, BLOCK_SIZE, true);
|
makeTestFile(path1, BLOCK_SIZE, true);
|
||||||
ensureFileReplicasOnStorageType(path1, RAM_DISK);
|
ensureFileReplicasOnStorageType(path1, RAM_DISK);
|
||||||
|
|
||||||
// Verify checksum was not computed.
|
// Sleep for a short time to allow the lazy writer thread to do its job.
|
||||||
|
// However the block replica should not be evicted from RAM_DISK yet.
|
||||||
|
Thread.sleep(3 * LAZY_WRITER_INTERVAL_SEC * 1000);
|
||||||
|
ensureFileReplicasOnStorageType(path1, RAM_DISK);
|
||||||
|
|
||||||
|
LOG.info("Restarting the DataNode");
|
||||||
|
cluster.restartDataNode(0, true);
|
||||||
|
cluster.waitActive();
|
||||||
|
|
||||||
|
// Ensure that the replica is now on persistent storage.
|
||||||
|
ensureFileReplicasOnStorageType(path1, DEFAULT);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test (timeout=300000)
|
||||||
|
public void testDnRestartWithUnsavedReplicas()
|
||||||
|
throws IOException, InterruptedException {
|
||||||
|
|
||||||
|
startUpCluster(REPL_FACTOR,
|
||||||
|
new StorageType[] {RAM_DISK, DEFAULT },
|
||||||
|
(2 * BLOCK_SIZE - 1)); // 1 replica + delta.
|
||||||
|
stopLazyWriter(cluster.getDataNodes().get(0));
|
||||||
|
|
||||||
|
final String METHOD_NAME = GenericTestUtils.getMethodName();
|
||||||
|
Path path1 = new Path("/" + METHOD_NAME + ".01.dat");
|
||||||
|
makeTestFile(path1, BLOCK_SIZE, true);
|
||||||
|
ensureFileReplicasOnStorageType(path1, RAM_DISK);
|
||||||
|
|
||||||
|
LOG.info("Restarting the DataNode");
|
||||||
|
cluster.restartDataNode(0, true);
|
||||||
|
cluster.waitActive();
|
||||||
|
|
||||||
|
// Ensure that the replica is still on transient storage.
|
||||||
|
ensureFileReplicasOnStorageType(path1, RAM_DISK);
|
||||||
}
|
}
|
||||||
|
|
||||||
// ---- Utility functions for all test cases -------------------------------
|
// ---- Utility functions for all test cases -------------------------------
|
||||||
|
@ -443,4 +474,10 @@ public class TestLazyPersistFiles {
|
||||||
|
|
||||||
return locatedBlocks;
|
return locatedBlocks;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void stopLazyWriter(DataNode dn) {
|
||||||
|
// Stop the lazyWriter daemon.
|
||||||
|
FsDatasetImpl fsDataset = ((FsDatasetImpl) dn.getFSDataset());
|
||||||
|
((FsDatasetImpl.LazyWriter) fsDataset.lazyWriter.getRunnable()).stop();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue