HDFS-6931. Move lazily persisted replicas to finalized directory on DN startup. (Arpit Agarwal)

This commit is contained in:
arp 2014-08-28 23:13:46 -07:00
parent 4cf9afacbe
commit c92837aeab
6 changed files with 213 additions and 50 deletions

View File

@ -27,3 +27,6 @@
HDFS-6960. Bugfix in LazyWriter, fix test case and some refactoring. HDFS-6960. Bugfix in LazyWriter, fix test case and some refactoring.
(Arpit Agarwal) (Arpit Agarwal)
HDFS-6931. Move lazily persisted replicas to finalized directory on DN
startup. (Arpit Agarwal)

View File

@ -94,17 +94,6 @@ class BlockPoolSlice {
} }
} }
// Delete all checkpointed replicas on startup.
// TODO: We can move checkpointed replicas to the finalized dir and delete
// the copy on RAM_DISK. For now we take the simpler approach.
FileUtil.fullyDelete(lazypersistDir);
if (!this.lazypersistDir.exists()) {
if (!this.lazypersistDir.mkdirs()) {
throw new IOException("Failed to mkdirs " + this.lazypersistDir);
}
}
// Files that were being written when the datanode was last shutdown // Files that were being written when the datanode was last shutdown
// are now moved back to the data directory. It is possible that // are now moved back to the data directory. It is possible that
// in the future, we might want to do some sort of datanode-local // in the future, we might want to do some sort of datanode-local
@ -271,6 +260,13 @@ File addBlock(Block b, File f) throws IOException {
return blockFile; return blockFile;
} }
/**
* Save the given replica to persistent storage.
*
* @param replicaInfo
* @return The saved block file.
* @throws IOException
*/
File lazyPersistReplica(ReplicaInfo replicaInfo) throws IOException { File lazyPersistReplica(ReplicaInfo replicaInfo) throws IOException {
if (!lazypersistDir.exists() && !lazypersistDir.mkdirs()) { if (!lazypersistDir.exists() && !lazypersistDir.mkdirs()) {
FsDatasetImpl.LOG.warn("Failed to create " + lazypersistDir); FsDatasetImpl.LOG.warn("Failed to create " + lazypersistDir);
@ -305,11 +301,21 @@ void checkDirs() throws DiskErrorException {
void getVolumeMap(ReplicaMap volumeMap) throws IOException { void getVolumeMap(ReplicaMap volumeMap,
final LazyWriteReplicaTracker lazyWriteReplicaMap)
throws IOException {
// Recover lazy persist replicas, they will be added to the volumeMap
// when we scan the finalized directory.
if (lazypersistDir.exists()) {
int numRecovered = moveLazyPersistReplicasToFinalized(lazypersistDir);
FsDatasetImpl.LOG.info(
"Recovered " + numRecovered + " replicas from " + lazypersistDir);
}
// add finalized replicas // add finalized replicas
addToReplicasMap(volumeMap, finalizedDir, true); addToReplicasMap(volumeMap, finalizedDir, lazyWriteReplicaMap, true);
// add rbw replicas // add rbw replicas
addToReplicasMap(volumeMap, rbwDir, false); addToReplicasMap(volumeMap, rbwDir, lazyWriteReplicaMap, false);
} }
/** /**
@ -337,19 +343,69 @@ File recoverTempUnlinkedBlock(File unlinkedTmp) throws IOException {
} }
/**
* Move replicas in the lazy persist directory to their corresponding locations
* in the finalized directory.
* @return number of replicas recovered.
*/
private int moveLazyPersistReplicasToFinalized(File source)
throws IOException {
File files[] = FileUtil.listFiles(source);
int numRecovered = 0;
for (File file : files) {
if (file.isDirectory()) {
numRecovered += moveLazyPersistReplicasToFinalized(file);
}
if (Block.isMetaFilename(file.getName())) {
File metaFile = file;
File blockFile = Block.metaToBlockFile(metaFile);
long blockId = Block.filename2id(blockFile.getName());
File targetDir = DatanodeUtil.idToBlockDir(finalizedDir, blockId);
if (blockFile.exists()) {
File targetBlockFile = new File(targetDir, blockFile.getName());
File targetMetaFile = new File(targetDir, metaFile.getName());
if (!targetDir.exists() && !targetDir.mkdirs()) {
FsDatasetImpl.LOG.warn("Failed to move " + blockFile + " to " + targetDir);
continue;
}
metaFile.renameTo(targetMetaFile);
blockFile.renameTo(targetBlockFile);
if (targetBlockFile.exists() && targetMetaFile.exists()) {
++numRecovered;
} else {
// Failure should be rare.
FsDatasetImpl.LOG.warn("Failed to move " + blockFile + " to " + targetDir);
}
}
}
}
FileUtil.fullyDelete(source);
return numRecovered;
}
/** /**
* Add replicas under the given directory to the volume map * Add replicas under the given directory to the volume map
* @param volumeMap the replicas map * @param volumeMap the replicas map
* @param dir an input directory * @param dir an input directory
* @param lazyWriteReplicaMap Map of replicas on transient
* storage.
* @param isFinalized true if the directory has finalized replicas; * @param isFinalized true if the directory has finalized replicas;
* false if the directory has rbw replicas * false if the directory has rbw replicas
*/ */
void addToReplicasMap(ReplicaMap volumeMap, File dir, boolean isFinalized void addToReplicasMap(ReplicaMap volumeMap, File dir,
) throws IOException { final LazyWriteReplicaTracker lazyWriteReplicaMap,
boolean isFinalized)
throws IOException {
File files[] = FileUtil.listFiles(dir); File files[] = FileUtil.listFiles(dir);
for (File file : files) { for (File file : files) {
if (file.isDirectory()) { if (file.isDirectory()) {
addToReplicasMap(volumeMap, file, isFinalized); addToReplicasMap(volumeMap, file, lazyWriteReplicaMap, isFinalized);
} }
if (isFinalized && FsDatasetUtil.isUnlinkTmpFile(file)) { if (isFinalized && FsDatasetUtil.isUnlinkTmpFile(file)) {
@ -405,14 +461,84 @@ void addToReplicasMap(ReplicaMap volumeMap, File dir, boolean isFinalized
} }
} }
ReplicaInfo oldReplica = volumeMap.add(bpid, newReplica); ReplicaInfo oldReplica = volumeMap.get(bpid, newReplica.getBlockId());
if (oldReplica != null) { if (oldReplica == null) {
FsDatasetImpl.LOG.warn("Two block files with the same block id exist " + volumeMap.add(bpid, newReplica);
"on disk: " + oldReplica.getBlockFile() + " and " + file ); } else {
// We have multiple replicas of the same block so decide which one
// to keep.
newReplica = resolveDuplicateReplicas(newReplica, oldReplica, volumeMap);
}
// If we are retaining a replica on transient storage make sure
// it is in the lazyWriteReplicaMap so it can be persisted
// eventually.
if (newReplica.getVolume().isTransientStorage()) {
lazyWriteReplicaMap.addReplica(bpid, blockId, newReplica.getVolume());
} else {
lazyWriteReplicaMap.discardReplica(bpid, blockId, true);
} }
} }
} }
/**
* This method is invoked during DN startup when volumes are scanned to
* build up the volumeMap.
*
* Given two replicas, decide which one to keep. The preference is as
* follows:
* 1. Prefer the replica with the higher generation stamp.
* 2. If generation stamps are equal, prefer the replica with the
* larger on-disk length.
* 3. If on-disk length is the same, prefer the replica on persistent
* storage volume.
* 4. All other factors being equal, keep replica1.
*
* The other replica is removed from the volumeMap and is deleted from
* its storage volume.
*
* @param replica1
* @param replica2
* @param volumeMap
* @return the replica that is retained.
* @throws IOException
*/
private ReplicaInfo resolveDuplicateReplicas(
final ReplicaInfo replica1, final ReplicaInfo replica2,
final ReplicaMap volumeMap) throws IOException {
ReplicaInfo replicaToKeep;
ReplicaInfo replicaToDelete;
if (replica1.getGenerationStamp() != replica2.getGenerationStamp()) {
replicaToKeep = replica1.getGenerationStamp() > replica2.getGenerationStamp()
? replica1 : replica2;
} else if (replica1.getNumBytes() != replica2.getNumBytes()) {
replicaToKeep = replica1.getNumBytes() > replica2.getNumBytes() ?
replica1 : replica2;
} else if (replica1.getVolume().isTransientStorage() &&
!replica2.getVolume().isTransientStorage()) {
replicaToKeep = replica2;
} else {
replicaToKeep = replica1;
}
replicaToDelete = (replicaToKeep == replica1) ? replica2 : replica1;
// Update volumeMap.
volumeMap.add(bpid, replicaToKeep);
// Delete the files on disk. Failure here is okay.
replicaToDelete.getBlockFile().delete();
replicaToDelete.getMetaFile().delete();
FsDatasetImpl.LOG.info(
"resolveDuplicateReplicas keeping " + replicaToKeep.getBlockFile() +
", deleting " + replicaToDelete.getBlockFile());
return replicaToKeep;
}
/** /**
* Find out the number of bytes in the block that match its crc. * Find out the number of bytes in the block that match its crc.
* *

View File

@ -262,13 +262,13 @@ public LengthInputStream getMetaDataInputStream(ExtendedBlock b)
volumes = new FsVolumeList(volsFailed, blockChooserImpl); volumes = new FsVolumeList(volsFailed, blockChooserImpl);
asyncDiskService = new FsDatasetAsyncDiskService(datanode); asyncDiskService = new FsDatasetAsyncDiskService(datanode);
// TODO: Initialize transientReplicaTracker from blocks on disk.
for (int idx = 0; idx < storage.getNumStorageDirs(); idx++) { for (int idx = 0; idx < storage.getNumStorageDirs(); idx++) {
addVolume(dataLocations, storage.getStorageDir(idx)); addVolume(dataLocations, storage.getStorageDir(idx));
} }
cacheManager = new FsDatasetCache(this); cacheManager = new FsDatasetCache(this);
// Start the lazy writer once we have built the replica maps.
lazyWriter = new Daemon(new LazyWriter( lazyWriter = new Daemon(new LazyWriter(
conf.getInt(DFSConfigKeys.DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC, conf.getInt(DFSConfigKeys.DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC,
DFSConfigKeys.DFS_DATANODE_LAZY_WRITER_INTERVAL_DEFAULT_SEC))); DFSConfigKeys.DFS_DATANODE_LAZY_WRITER_INTERVAL_DEFAULT_SEC)));
@ -287,7 +287,7 @@ private void addVolume(Collection<StorageLocation> dataLocations,
// storageMap and asyncDiskService, consistent. // storageMap and asyncDiskService, consistent.
FsVolumeImpl fsVolume = FsVolumeImplAllocator.createVolume( FsVolumeImpl fsVolume = FsVolumeImplAllocator.createVolume(
this, sd.getStorageUuid(), dir, this.conf, storageType); this, sd.getStorageUuid(), dir, this.conf, storageType);
fsVolume.getVolumeMap(volumeMap); fsVolume.getVolumeMap(volumeMap, lazyWriteReplicaTracker);
volumes.addVolume(fsVolume); volumes.addVolume(fsVolume);
storageMap.put(sd.getStorageUuid(), storageMap.put(sd.getStorageUuid(),
@ -2021,7 +2021,7 @@ public void addBlockPool(String bpid, Configuration conf)
volumes.addBlockPool(bpid, conf); volumes.addBlockPool(bpid, conf);
volumeMap.initBlockPool(bpid); volumeMap.initBlockPool(bpid);
} }
volumes.getAllVolumesMap(bpid, volumeMap); volumes.getAllVolumesMap(bpid, volumeMap, lazyWriteReplicaTracker);
} }
@Override @Override
@ -2261,6 +2261,7 @@ private boolean saveNextReplica() {
LOG.warn("Exception saving replica " + replicaState, ioe); LOG.warn("Exception saving replica " + replicaState, ioe);
} finally { } finally {
if (!succeeded && replicaState != null) { if (!succeeded && replicaState != null) {
LOG.warn("Failed to save replica " + replicaState + ". re-enqueueing it.");
lazyWriteReplicaTracker.reenqueueReplica(replicaState); lazyWriteReplicaTracker.reenqueueReplica(replicaState);
} }
} }

View File

@ -245,14 +245,18 @@ void checkDirs() throws DiskErrorException {
} }
} }
void getVolumeMap(ReplicaMap volumeMap) throws IOException { void getVolumeMap(ReplicaMap volumeMap,
final LazyWriteReplicaTracker lazyWriteReplicaMap)
throws IOException {
for(BlockPoolSlice s : bpSlices.values()) { for(BlockPoolSlice s : bpSlices.values()) {
s.getVolumeMap(volumeMap); s.getVolumeMap(volumeMap, lazyWriteReplicaMap);
} }
} }
void getVolumeMap(String bpid, ReplicaMap volumeMap) throws IOException { void getVolumeMap(String bpid, ReplicaMap volumeMap,
getBlockPoolSlice(bpid).getVolumeMap(volumeMap); final LazyWriteReplicaTracker lazyWriteReplicaMap)
throws IOException {
getBlockPoolSlice(bpid).getVolumeMap(volumeMap, lazyWriteReplicaMap);
} }
@Override @Override

View File

@ -119,7 +119,10 @@ long getRemaining() throws IOException {
return remaining; return remaining;
} }
void getAllVolumesMap(final String bpid, final ReplicaMap volumeMap) throws IOException { void getAllVolumesMap(final String bpid,
final ReplicaMap volumeMap,
final LazyWriteReplicaTracker lazyWriteReplicaMap)
throws IOException {
long totalStartTime = Time.monotonicNow(); long totalStartTime = Time.monotonicNow();
final List<IOException> exceptions = Collections.synchronizedList( final List<IOException> exceptions = Collections.synchronizedList(
new ArrayList<IOException>()); new ArrayList<IOException>());
@ -131,7 +134,7 @@ public void run() {
FsDatasetImpl.LOG.info("Adding replicas to map for block pool " + FsDatasetImpl.LOG.info("Adding replicas to map for block pool " +
bpid + " on volume " + v + "..."); bpid + " on volume " + v + "...");
long startTime = Time.monotonicNow(); long startTime = Time.monotonicNow();
v.getVolumeMap(bpid, volumeMap); v.getVolumeMap(bpid, volumeMap, lazyWriteReplicaMap);
long timeTaken = Time.monotonicNow() - startTime; long timeTaken = Time.monotonicNow() - startTime;
FsDatasetImpl.LOG.info("Time to add replicas to map for block pool" FsDatasetImpl.LOG.info("Time to add replicas to map for block pool"
+ " " + bpid + " on volume " + v + ": " + timeTaken + "ms"); + " " + bpid + " on volume " + v + ": " + timeTaken + "ms");
@ -160,17 +163,6 @@ public void run() {
+ totalTimeTaken + "ms"); + totalTimeTaken + "ms");
} }
void getVolumeMap(String bpid, FsVolumeImpl volume, ReplicaMap volumeMap)
throws IOException {
FsDatasetImpl.LOG.info("Adding replicas to map for block pool " + bpid +
" on volume " + volume + "...");
long startTime = Time.monotonicNow();
volume.getVolumeMap(bpid, volumeMap);
long timeTaken = Time.monotonicNow() - startTime;
FsDatasetImpl.LOG.info("Time to add replicas to map for block pool " + bpid +
" on volume " + volume + ": " + timeTaken + "ms");
}
/** /**
* Calls {@link FsVolumeImpl#checkDirs()} on each volume, removing any * Calls {@link FsVolumeImpl#checkDirs()} on each volume, removing any
* volumes from the active list that result in a DiskErrorException. * volumes from the active list that result in a DiskErrorException.

View File

@ -265,7 +265,9 @@ public void testLazyPersistBlocksAreSaved()
LocatedBlocks locatedBlocks = ensureFileReplicasOnStorageType(path, RAM_DISK); LocatedBlocks locatedBlocks = ensureFileReplicasOnStorageType(path, RAM_DISK);
// Sleep for a short time to allow the lazy writer thread to do its job // Sleep for a short time to allow the lazy writer thread to do its job
Thread.sleep(3 * LAZY_WRITER_INTERVAL_SEC * 1000); Thread.sleep(6 * LAZY_WRITER_INTERVAL_SEC * 1000);
LOG.info("Verifying copy was saved to lazyPersist/");
// Make sure that there is a saved copy of the replica on persistent // Make sure that there is a saved copy of the replica on persistent
// storage. // storage.
@ -330,23 +332,52 @@ public void testRamDiskEviction()
ensureFileReplicasOnStorageType(path1, DEFAULT); ensureFileReplicasOnStorageType(path1, DEFAULT);
} }
/**
* TODO: Stub test, to be completed.
* Verify that checksum computation is skipped for files written to memory.
*/
@Test (timeout=300000) @Test (timeout=300000)
public void testChecksumIsSkipped() public void testDnRestartWithSavedReplicas()
throws IOException, InterruptedException { throws IOException, InterruptedException {
startUpCluster(REPL_FACTOR, startUpCluster(REPL_FACTOR,
new StorageType[] {RAM_DISK, DEFAULT }, -1); new StorageType[] {RAM_DISK, DEFAULT },
(2 * BLOCK_SIZE - 1)); // 1 replica + delta.
final String METHOD_NAME = GenericTestUtils.getMethodName(); final String METHOD_NAME = GenericTestUtils.getMethodName();
Path path1 = new Path("/" + METHOD_NAME + ".01.dat"); Path path1 = new Path("/" + METHOD_NAME + ".01.dat");
makeTestFile(path1, BLOCK_SIZE, true); makeTestFile(path1, BLOCK_SIZE, true);
ensureFileReplicasOnStorageType(path1, RAM_DISK); ensureFileReplicasOnStorageType(path1, RAM_DISK);
// Verify checksum was not computed. // Sleep for a short time to allow the lazy writer thread to do its job.
// However the block replica should not be evicted from RAM_DISK yet.
Thread.sleep(3 * LAZY_WRITER_INTERVAL_SEC * 1000);
ensureFileReplicasOnStorageType(path1, RAM_DISK);
LOG.info("Restarting the DataNode");
cluster.restartDataNode(0, true);
cluster.waitActive();
// Ensure that the replica is now on persistent storage.
ensureFileReplicasOnStorageType(path1, DEFAULT);
}
@Test (timeout=300000)
public void testDnRestartWithUnsavedReplicas()
throws IOException, InterruptedException {
startUpCluster(REPL_FACTOR,
new StorageType[] {RAM_DISK, DEFAULT },
(2 * BLOCK_SIZE - 1)); // 1 replica + delta.
stopLazyWriter(cluster.getDataNodes().get(0));
final String METHOD_NAME = GenericTestUtils.getMethodName();
Path path1 = new Path("/" + METHOD_NAME + ".01.dat");
makeTestFile(path1, BLOCK_SIZE, true);
ensureFileReplicasOnStorageType(path1, RAM_DISK);
LOG.info("Restarting the DataNode");
cluster.restartDataNode(0, true);
cluster.waitActive();
// Ensure that the replica is still on transient storage.
ensureFileReplicasOnStorageType(path1, RAM_DISK);
} }
// ---- Utility functions for all test cases ------------------------------- // ---- Utility functions for all test cases -------------------------------
@ -443,4 +474,10 @@ private LocatedBlocks ensureFileReplicasOnStorageType(
return locatedBlocks; return locatedBlocks;
} }
private void stopLazyWriter(DataNode dn) {
// Stop the lazyWriter daemon.
FsDatasetImpl fsDataset = ((FsDatasetImpl) dn.getFSDataset());
((FsDatasetImpl.LazyWriter) fsDataset.lazyWriter.getRunnable()).stop();
}
} }