HDFS-6977. Delete all copies when a block is deleted from the block space. (Arpit Agarwal)

Conflicts:
	hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-6581.txt
This commit is contained in:
arp 2014-09-08 10:35:54 -07:00 committed by Jitendra Pandey
parent 21046d8310
commit 3abf34af9b
4 changed files with 53 additions and 30 deletions

View File

@ -270,17 +270,16 @@ File addBlock(Block b, File f) throws IOException {
* Save the given replica to persistent storage. * Save the given replica to persistent storage.
* *
* @param replicaInfo * @param replicaInfo
* @return The saved block file. * @return The saved meta and block files, in that order.
* @throws IOException * @throws IOException
*/ */
File lazyPersistReplica(ReplicaInfo replicaInfo) throws IOException { File[] lazyPersistReplica(ReplicaInfo replicaInfo) throws IOException {
if (!lazypersistDir.exists() && !lazypersistDir.mkdirs()) { if (!lazypersistDir.exists() && !lazypersistDir.mkdirs()) {
FsDatasetImpl.LOG.warn("Failed to create " + lazypersistDir); FsDatasetImpl.LOG.warn("Failed to create " + lazypersistDir);
} }
File metaFile = FsDatasetImpl.copyBlockFiles(replicaInfo, lazypersistDir); File targetFiles[] = FsDatasetImpl.copyBlockFiles(replicaInfo, lazypersistDir);
File blockFile = Block.metaToBlockFile(metaFile); dfsUsage.incDfsUsed(targetFiles[0].length() + targetFiles[1].length());
dfsUsage.incDfsUsed(replicaInfo.getNumBytes() + metaFile.length()); return targetFiles;
return blockFile;
} }
/** /**
@ -484,7 +483,7 @@ void addToReplicasMap(ReplicaMap volumeMap, File dir,
if (newReplica.getVolume().isTransientStorage()) { if (newReplica.getVolume().isTransientStorage()) {
lazyWriteReplicaMap.addReplica(bpid, blockId, newReplica.getVolume()); lazyWriteReplicaMap.addReplica(bpid, blockId, newReplica.getVolume());
} else { } else {
lazyWriteReplicaMap.discardReplica(bpid, blockId, true); lazyWriteReplicaMap.discardReplica(bpid, blockId, false);
} }
} }
} }

View File

@ -707,10 +707,10 @@ static File moveBlockFiles(Block b, File srcfile, File destdir)
/** /**
* Copy the block and meta files for the given block from the given * Copy the block and meta files for the given block from the given
* @return the new meta file. * @return the new meta and block files.
* @throws IOException * @throws IOException
*/ */
static File copyBlockFiles(ReplicaInfo replicaInfo, File destRoot) static File[] copyBlockFiles(ReplicaInfo replicaInfo, File destRoot)
throws IOException { throws IOException {
final File destDir = DatanodeUtil.idToBlockDir(destRoot, replicaInfo.getBlockId()); final File destDir = DatanodeUtil.idToBlockDir(destRoot, replicaInfo.getBlockId());
final File dstFile = new File(destDir, replicaInfo.getBlockName()); final File dstFile = new File(destDir, replicaInfo.getBlockName());
@ -731,7 +731,7 @@ static File copyBlockFiles(ReplicaInfo replicaInfo, File destRoot)
LOG.debug("addBlock: Moved " + srcMeta + " to " + dstMeta); LOG.debug("addBlock: Moved " + srcMeta + " to " + dstMeta);
LOG.debug("addBlock: Moved " + srcFile + " to " + dstFile); LOG.debug("addBlock: Moved " + srcFile + " to " + dstFile);
} }
return dstMeta; return new File[] {dstMeta, dstFile};
} }
static private void truncateBlock(File blockFile, File metaFile, static private void truncateBlock(File blockFile, File metaFile,
@ -2332,13 +2332,14 @@ private void moveReplicaToNewVolume(String bpid, long blockId)
} }
lazyWriteReplicaTracker.recordStartLazyPersist(bpid, blockId, targetVolume); lazyWriteReplicaTracker.recordStartLazyPersist(bpid, blockId, targetVolume);
File savedBlockFile = targetVolume.getBlockPoolSlice(bpid) File[] savedFiles = targetVolume.getBlockPoolSlice(bpid)
.lazyPersistReplica(replicaInfo); .lazyPersistReplica(replicaInfo);
lazyWriteReplicaTracker.recordEndLazyPersist(bpid, blockId, savedBlockFile); lazyWriteReplicaTracker.recordEndLazyPersist(
bpid, blockId, savedFiles[0], savedFiles[1]);
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("LazyWriter finished saving blockId=" + blockId + "; bpid=" + bpid + LOG.debug("LazyWriter finished saving blockId=" + blockId + "; bpid=" + bpid +
" to file " + savedBlockFile); " to file " + savedFiles[1]);
} }
} }
} }

View File

@ -48,6 +48,7 @@ static class ReplicaState implements Comparable<ReplicaState> {
* Persistent volume that holds or will hold the saved replica. * Persistent volume that holds or will hold the saved replica.
*/ */
FsVolumeImpl lazyPersistVolume; FsVolumeImpl lazyPersistVolume;
File savedMetaFile;
File savedBlockFile; File savedBlockFile;
ReplicaState(final String bpid, final long blockId, FsVolumeSpi transientVolume) { ReplicaState(final String bpid, final long blockId, FsVolumeSpi transientVolume) {
@ -56,9 +57,26 @@ static class ReplicaState implements Comparable<ReplicaState> {
this.transientVolume = transientVolume; this.transientVolume = transientVolume;
state = State.IN_MEMORY; state = State.IN_MEMORY;
lazyPersistVolume = null; lazyPersistVolume = null;
savedMetaFile = null;
savedBlockFile = null; savedBlockFile = null;
} }
void deleteSavedFiles() {
try {
if (savedBlockFile != null) {
savedBlockFile.delete();
savedBlockFile = null;
}
if (savedMetaFile != null) {
savedMetaFile.delete();
savedMetaFile = null;
}
} catch (Throwable t) {
// Ignore any exceptions.
}
}
@Override @Override
public String toString() { public String toString() {
return "[Bpid=" + bpid + ";blockId=" + blockId + "]"; return "[Bpid=" + bpid + ";blockId=" + blockId + "]";
@ -144,7 +162,8 @@ synchronized void recordStartLazyPersist(
} }
synchronized void recordEndLazyPersist( synchronized void recordEndLazyPersist(
final String bpid, final long blockId, File savedBlockFile) { final String bpid, final long blockId,
final File savedMetaFile, final File savedBlockFile) {
Map<Long, ReplicaState> map = replicaMaps.get(bpid); Map<Long, ReplicaState> map = replicaMaps.get(bpid);
ReplicaState replicaState = map.get(blockId); ReplicaState replicaState = map.get(blockId);
@ -153,6 +172,7 @@ synchronized void recordEndLazyPersist(
bpid + "; blockId=" + blockId); bpid + "; blockId=" + blockId);
} }
replicaState.state = State.LAZY_PERSIST_COMPLETE; replicaState.state = State.LAZY_PERSIST_COMPLETE;
replicaState.savedMetaFile = savedMetaFile;
replicaState.savedBlockFile = savedBlockFile; replicaState.savedBlockFile = savedBlockFile;
if (replicasNotPersisted.peek() == replicaState) { if (replicasNotPersisted.peek() == replicaState) {
@ -208,12 +228,22 @@ synchronized ReplicaState getNextCandidateForEviction() {
return null; return null;
} }
void discardReplica(ReplicaState replicaState, boolean force) { void discardReplica(ReplicaState replicaState, boolean deleteSavedCopies) {
discardReplica(replicaState.bpid, replicaState.blockId, force); discardReplica(replicaState.bpid, replicaState.blockId, deleteSavedCopies);
} }
/**
* Discard any state we are tracking for the given replica. This could mean
* the block is either deleted from the block space or the replica is no longer
* on transient storage.
*
* @param deleteSavedCopies true if we should delete the saved copies on
* persistent storage. This should be set by the
* caller when the block is no longer needed.
*/
synchronized void discardReplica( synchronized void discardReplica(
final String bpid, final long blockId, boolean force) { final String bpid, final long blockId,
boolean deleteSavedCopies) {
Map<Long, ReplicaState> map = replicaMaps.get(bpid); Map<Long, ReplicaState> map = replicaMaps.get(bpid);
if (map == null) { if (map == null) {
@ -223,19 +253,12 @@ synchronized void discardReplica(
ReplicaState replicaState = map.get(blockId); ReplicaState replicaState = map.get(blockId);
if (replicaState == null) { if (replicaState == null) {
if (force) { return;
return;
}
throw new IllegalStateException("Unknown replica bpid=" +
bpid + "; blockId=" + blockId);
} }
if (replicaState.state != State.LAZY_PERSIST_COMPLETE && !force) { if (deleteSavedCopies) {
throw new IllegalStateException("Discarding replica without " + replicaState.deleteSavedFiles();
"saving it to disk bpid=" + bpid + "; blockId=" + blockId);
} }
map.remove(blockId); map.remove(blockId);
} }
} }

View File

@ -394,7 +394,7 @@ public void testLazyPersistBlocksAreSaved()
public void testRamDiskEviction() public void testRamDiskEviction()
throws IOException, InterruptedException { throws IOException, InterruptedException {
startUpCluster(REPL_FACTOR, startUpCluster(REPL_FACTOR,
new StorageType[] {RAM_DISK, DEFAULT }, new StorageType[] { RAM_DISK, DEFAULT },
(2 * BLOCK_SIZE - 1)); // 1 replica + delta. (2 * BLOCK_SIZE - 1)); // 1 replica + delta.
final String METHOD_NAME = GenericTestUtils.getMethodName(); final String METHOD_NAME = GenericTestUtils.getMethodName();
Path path1 = new Path("/" + METHOD_NAME + ".01.dat"); Path path1 = new Path("/" + METHOD_NAME + ".01.dat");
@ -415,7 +415,7 @@ public void testRamDiskEviction()
// Make sure that the second file's block replica is on RAM_DISK, whereas // Make sure that the second file's block replica is on RAM_DISK, whereas
// the original file's block replica is now on disk. // the original file's block replica is now on disk.
// ensureFileReplicasOnStorageType(path2, RAM_DISK); ensureFileReplicasOnStorageType(path2, RAM_DISK);
ensureFileReplicasOnStorageType(path1, DEFAULT); ensureFileReplicasOnStorageType(path1, DEFAULT);
} }