diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-6581.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-6581.txt index b0fb070ada3..881cb63ab6d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-6581.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-6581.txt @@ -24,3 +24,6 @@ HDFS-6928. 'hdfs put' command should accept lazyPersist flag for testing. (Arpit Agarwal) + HDFS-6960. Bugfix in LazyWriter, fix test case and some refactoring. + (Arpit Agarwal) + diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java index 31a254bf22e..1313fef7ed5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java @@ -271,10 +271,13 @@ File addBlock(Block b, File f) throws IOException { return blockFile; } - File lazyPersistReplica(Block b, File f) throws IOException { - File blockFile = FsDatasetImpl.copyBlockFiles(b, f, lazypersistDir); - File metaFile = FsDatasetUtil.getMetaFile(blockFile, b.getGenerationStamp()); - dfsUsage.incDfsUsed(b.getNumBytes() + metaFile.length()); + File lazyPersistReplica(ReplicaInfo replicaInfo) throws IOException { + if (!lazypersistDir.exists() && !lazypersistDir.mkdirs()) { + FsDatasetImpl.LOG.warn("Failed to create " + lazypersistDir); + } + File metaFile = FsDatasetImpl.copyBlockFiles(replicaInfo, lazypersistDir); + File blockFile = Block.metaToBlockFile(metaFile); + dfsUsage.incDfsUsed(replicaInfo.getNumBytes() + metaFile.length()); return blockFile; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java index 8643d6bc08f..22f626cc654 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java @@ -58,6 +58,7 @@ import org.apache.hadoop.hdfs.server.datanode.DataBlockScanner; import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataStorage; +import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil; import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica; import org.apache.hadoop.hdfs.server.datanode.Replica; import org.apache.hadoop.hdfs.server.datanode.ReplicaAlreadyExistsException; @@ -565,28 +566,33 @@ static File moveBlockFiles(Block b, File srcfile, File destdir) return dstfile; } - static File copyBlockFiles(Block b, File srcfile, File destdir) + /** + * Copy the block and meta files for the given block from the given + * @return the new meta file. + * @throws IOException + */ + static File copyBlockFiles(ReplicaInfo replicaInfo, File destRoot) throws IOException { - final File dstfile = new File(destdir, b.getBlockName()); - final File srcmeta = FsDatasetUtil.getMetaFile(srcfile, b.getGenerationStamp()); - final File dstmeta = FsDatasetUtil.getMetaFile(dstfile, b.getGenerationStamp()); + final File destDir = DatanodeUtil.idToBlockDir(destRoot, replicaInfo.getBlockId()); + final File dstFile = new File(destDir, replicaInfo.getBlockName()); + final File dstMeta = FsDatasetUtil.getMetaFile(dstFile, replicaInfo.getGenerationStamp()); + final File srcMeta = replicaInfo.getMetaFile(); + final File srcFile = replicaInfo.getBlockFile(); try { - FileUtils.copyFile(srcmeta, dstmeta); + FileUtils.copyFile(srcMeta, dstMeta); } catch (IOException e) { - throw new IOException("Failed to copy meta file for " + b - + " from " + srcmeta + " to " + dstmeta, e); + throw new IOException("Failed to copy " + srcMeta + " to " + dstMeta, e); } try { - FileUtils.copyFile(srcfile, dstfile); + FileUtils.copyFile(srcFile, dstFile); } catch (IOException e) { - throw new IOException("Failed to copy block file for " + b - + " from " + srcfile + " to " + dstfile.getAbsolutePath(), e); + throw new IOException("Failed to copy " + srcFile + " to " + dstFile, e); } if (LOG.isDebugEnabled()) { - LOG.debug("addBlock: Moved " + srcmeta + " to " + dstmeta - + " and " + srcfile + " to " + dstfile); + LOG.debug("addBlock: Moved " + srcMeta + " to " + dstMeta); + LOG.debug("addBlock: Moved " + srcFile + " to " + dstFile); } - return dstfile; + return dstMeta; } static private void truncateBlock(File blockFile, File metaFile, @@ -1174,10 +1180,6 @@ private synchronized FinalizedReplica finalizeReplica(String bpid, if (v.isTransientStorage()) { lazyWriteReplicaTracker.addReplica(bpid, replicaInfo.getBlockId(), v); - - // Schedule a checkpoint. - ((LazyWriter) lazyWriter.getRunnable()) - .addReplicaToLazyWriteQueue(bpid, replicaInfo.getBlockId()); } } volumeMap.add(bpid, newReplicaInfo); @@ -2188,32 +2190,12 @@ public void submitBackgroundSyncFileRangeRequest(ExtendedBlock block, nbytes, flags); } - private static class BlockIdPair { - final String bpid; - final long blockId; - - BlockIdPair(final String bpid, final long blockId) { - this.bpid = bpid; - this.blockId = blockId; - } - } - - private class LazyWriter implements Runnable { + class LazyWriter implements Runnable { private volatile boolean shouldRun = true; final int checkpointerInterval; - final private Queue blocksPendingCheckpoint; - public LazyWriter(final int checkpointerInterval) { this.checkpointerInterval = checkpointerInterval; - blocksPendingCheckpoint = new LinkedList(); - } - - // Schedule a replica for writing to persistent storage. - public synchronized void addReplicaToLazyWriteQueue( - String bpid, long blockId) { - LOG.info("Block with blockId=" + blockId + "; bpid=" + bpid + " added to lazy writer queue"); - blocksPendingCheckpoint.add(new BlockIdPair(bpid, blockId)); } private void moveReplicaToNewVolume(String bpid, long blockId) @@ -2221,76 +2203,85 @@ private void moveReplicaToNewVolume(String bpid, long blockId) LOG.info("LazyWriter invoked to save blockId=" + blockId + "; bpid=" + bpid); - FsVolumeImpl targetVolume = null; - Block block = null; - File blockFile = null; + FsVolumeImpl targetVolume; + ReplicaInfo replicaInfo; synchronized (this) { - block = getStoredBlock(bpid, blockId); - blockFile = getFile(bpid, blockId); + replicaInfo = volumeMap.get(bpid, blockId); - if (block == null) { - // The block was deleted before it could be checkpointed. + if (replicaInfo == null || !replicaInfo.getVolume().isTransientStorage()) { + // The block was either deleted before it could be checkpointed or + // it is already on persistent storage. This can occur if a second + // replica on persistent storage was found after the lazy write was + // scheduled. return; } // Pick a target volume for the block. targetVolume = volumes.getNextVolume( - StorageType.DEFAULT, block.getNumBytes()); + StorageType.DEFAULT, replicaInfo.getNumBytes()); + } + + if (LOG.isDebugEnabled()) { + LOG.debug("LazyWriter starting to save blockId=" + blockId + "; bpid=" + bpid); } - LOG.info("LazyWriter starting to save blockId=" + blockId + "; bpid=" + bpid); lazyWriteReplicaTracker.recordStartLazyPersist(bpid, blockId, targetVolume); File savedBlockFile = targetVolume.getBlockPoolSlice(bpid) - .lazyPersistReplica(block, blockFile); + .lazyPersistReplica(replicaInfo); lazyWriteReplicaTracker.recordEndLazyPersist(bpid, blockId, savedBlockFile); - LOG.info("LazyWriter finished saving blockId=" + blockId + "; bpid=" + bpid + - " to file " + savedBlockFile); + + if (LOG.isDebugEnabled()) { + LOG.debug("LazyWriter finished saving blockId=" + blockId + "; bpid=" + bpid + + " to file " + savedBlockFile); + } } /** * Checkpoint a pending replica to persistent storage now. + * If we fail then move the replica to the end of the queue. * @return true if there is more work to be done, false otherwise. */ private boolean saveNextReplica() { - BlockIdPair blockIdPair = null; - int moreWorkThreshold = 0; + LazyWriteReplicaTracker.ReplicaState replicaState = null; + boolean succeeded = false; try { synchronized (this) { - // Dequeue the next replica waiting to be checkpointed. - blockIdPair = blocksPendingCheckpoint.poll(); - if (blockIdPair == null) { - LOG.info("LazyWriter has no blocks to persist. " + - "Thread going to sleep."); + replicaState = lazyWriteReplicaTracker.dequeueNextReplicaToPersist(); + if (replicaState == null) { return false; } } // Move the replica outside the lock. - moveReplicaToNewVolume(blockIdPair.bpid, blockIdPair.blockId); - + moveReplicaToNewVolume(replicaState.bpid, replicaState.blockId); + succeeded = true; } catch(IOException ioe) { - // If we failed, put the block on the queue and let a retry - // interval elapse before we try again so we don't try to keep - // checkpointing the same block in a tight loop. - synchronized (this) { - blocksPendingCheckpoint.add(blockIdPair); - ++moreWorkThreshold; + LOG.warn("Exception saving replica " + replicaState, ioe); + } finally { + if (!succeeded && replicaState != null) { + lazyWriteReplicaTracker.reenqueueReplica(replicaState); } } - synchronized (this) { - return blocksPendingCheckpoint.size() > moreWorkThreshold; - } + return succeeded; } @Override public void run() { + int numSuccessiveFailures = 0; + while (fsRunning && shouldRun) { try { - if (!saveNextReplica()) { + numSuccessiveFailures = saveNextReplica() ? 0 : (numSuccessiveFailures + 1); + + // Sleep if we have no more work to do or if it looks like we are not + // making any forward progress. This is to ensure that if all persist + // operations are failing we don't keep retrying them in a tight loop. + if (numSuccessiveFailures == lazyWriteReplicaTracker.numReplicasNotPersisted()) { Thread.sleep(checkpointerInterval * 1000); + numSuccessiveFailures = 0; } } catch (InterruptedException e) { LOG.info("LazyWriter was interrupted, exiting"); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java index d3c585d6f5d..85756b78cee 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java @@ -255,22 +255,6 @@ void getVolumeMap(String bpid, ReplicaMap volumeMap) throws IOException { getBlockPoolSlice(bpid).getVolumeMap(volumeMap); } - /** - * Add replicas under the given directory to the volume map - * @param volumeMap the replicas map - * @param dir an input directory - * @param isFinalized true if the directory has finalized replicas; - * false if the directory has rbw replicas - * @throws IOException - */ - void addToReplicasMap(String bpid, ReplicaMap volumeMap, - File dir, boolean isFinalized) throws IOException { - BlockPoolSlice bp = getBlockPoolSlice(bpid); - // TODO move this up - // dfsUsage.incDfsUsed(b.getNumBytes()+metaFile.length()); - bp.addToReplicasMap(volumeMap, dir, isFinalized); - } - @Override public String toString() { return currentDir.getAbsolutePath(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/LazyWriteReplicaTracker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/LazyWriteReplicaTracker.java index ae28f092124..222b63aad7f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/LazyWriteReplicaTracker.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/LazyWriteReplicaTracker.java @@ -19,12 +19,11 @@ package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl; -import com.google.common.collect.Multimap; import com.google.common.collect.TreeMultimap; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import java.io.File; -import java.util.HashMap; -import java.util.Map; +import java.util.*; class LazyWriteReplicaTracker { @@ -43,7 +42,7 @@ static class ReplicaState implements Comparable { /** * transient storage volume that holds the original replica. */ - final FsVolumeImpl transientVolume; + final FsVolumeSpi transientVolume; /** * Persistent volume that holds or will hold the saved replica. @@ -51,7 +50,7 @@ static class ReplicaState implements Comparable { FsVolumeImpl lazyPersistVolume; File savedBlockFile; - ReplicaState(final String bpid, final long blockId, FsVolumeImpl transientVolume) { + ReplicaState(final String bpid, final long blockId, FsVolumeSpi transientVolume) { this.bpid = bpid; this.blockId = blockId; this.transientVolume = transientVolume; @@ -60,6 +59,11 @@ static class ReplicaState implements Comparable { savedBlockFile = null; } + @Override + public String toString() { + return "[Bpid=" + bpid + ";blockId=" + blockId + "]"; + } + @Override public int hashCode() { return bpid.hashCode() ^ (int) blockId; @@ -98,36 +102,44 @@ public int compareTo(ReplicaState other) { */ final Map> replicaMaps; + /** + * Queue of replicas that need to be written to disk. + */ + final Queue replicasNotPersisted; + /** * A map of blockId to persist complete time for transient blocks. This allows * us to evict LRU blocks from transient storage. Protected by 'this' * Object lock. */ - final Map persistTimeMap; + final Map replicasPersisted; LazyWriteReplicaTracker(final FsDatasetImpl fsDataset) { this.fsDataset = fsDataset; replicaMaps = new HashMap>(); - persistTimeMap = new HashMap(); + replicasNotPersisted = new LinkedList(); + replicasPersisted = new HashMap(); } TreeMultimap getLruMap() { // TODO: This can be made more efficient. TreeMultimap reversedMap = TreeMultimap.create(); - for (Map.Entry entry : persistTimeMap.entrySet()) { + for (Map.Entry entry : replicasPersisted.entrySet()) { reversedMap.put(entry.getValue(), entry.getKey()); } return reversedMap; } synchronized void addReplica(String bpid, long blockId, - final FsVolumeImpl transientVolume) { + final FsVolumeSpi transientVolume) { Map map = replicaMaps.get(bpid); if (map == null) { map = new HashMap(); replicaMaps.put(bpid, map); } - map.put(blockId, new ReplicaState(bpid, blockId, transientVolume)); + ReplicaState replicaState = new ReplicaState(bpid, blockId, transientVolume); + map.put(blockId, replicaState); + replicasNotPersisted.add(replicaState); } synchronized void recordStartLazyPersist( @@ -149,12 +161,49 @@ synchronized void recordEndLazyPersist( } replicaState.state = State.LAZY_PERSIST_COMPLETE; replicaState.savedBlockFile = savedBlockFile; - persistTimeMap.put(replicaState, System.currentTimeMillis() / 1000); + + if (replicasNotPersisted.peek() == replicaState) { + // Common case. + replicasNotPersisted.remove(); + } else { + // Should never occur in practice as lazy writer always persists + // the replica at the head of the queue before moving to the next + // one. + replicasNotPersisted.remove(replicaState); + } + replicasPersisted.put(replicaState, System.currentTimeMillis() / 1000); + } + + synchronized ReplicaState dequeueNextReplicaToPersist() { + while (replicasNotPersisted.size() != 0) { + ReplicaState replicaState = replicasNotPersisted.remove(); + Map replicaMap = replicaMaps.get(replicaState.bpid); + + if (replicaMap != null && replicaMap.get(replicaState.blockId) != null) { + return replicaState; + } + + // The replica no longer exists, look for the next one. + } + return null; + } + + synchronized void reenqueueReplica(final ReplicaState replicaState) { + replicasNotPersisted.add(replicaState); + } + + synchronized int numReplicasNotPersisted() { + return replicasNotPersisted.size(); } synchronized void discardReplica( final String bpid, final long blockId, boolean force) { Map map = replicaMaps.get(bpid); + + if (map == null) { + return; + } + ReplicaState replicaState = map.get(blockId); if (replicaState == null) { @@ -172,6 +221,9 @@ synchronized void discardReplica( } map.remove(blockId); - persistTimeMap.remove(replicaState); + replicasPersisted.remove(replicaState); + + // Leave the replica in replicasNotPersisted if its present. + // dequeueNextReplicaToPersist will GC it eventually. } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java index ddd71b1b5e1..af0e8acbae2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java @@ -40,7 +40,9 @@ import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.StorageType; import org.apache.hadoop.hdfs.protocol.*; +import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; +import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.test.GenericTestUtils; @@ -61,6 +63,7 @@ public class TestLazyPersistFiles { static { ((Log4JLogger) NameNode.blockStateChangeLog).getLogger().setLevel(Level.ALL); ((Log4JLogger) NameNode.stateChangeLog).getLogger().setLevel(Level.ALL); + ((Log4JLogger) FsDatasetImpl.LOG).getLogger().setLevel(Level.ALL); } private static short REPL_FACTOR = 1; @@ -68,7 +71,7 @@ public class TestLazyPersistFiles { private static final int LAZY_WRITE_FILE_SCRUBBER_INTERVAL_SEC = 3; private static final long HEARTBEAT_INTERVAL_SEC = 1; private static final int HEARTBEAT_RECHECK_INTERVAL_MSEC = 500; - private static final int LAZY_WRITER_INTERVAL_SEC = 3; + private static final int LAZY_WRITER_INTERVAL_SEC = 1; private static final int BUFFER_LENGTH = 4096; private MiniDFSCluster cluster; @@ -283,8 +286,9 @@ public void testLazyPersistBlocksAreSaved() File lazyPersistDir = volume.getBlockPoolSlice(bpid).getLazypersistDir(); for (LocatedBlock lb : locatedBlocks.getLocatedBlocks()) { - File persistedBlockFile = new File(lazyPersistDir, "blk_" + lb.getBlock().getBlockId()); - if (persistedBlockFile.exists()) { + File targetDir = DatanodeUtil.idToBlockDir(lazyPersistDir, lb.getBlock().getBlockId()); + File blockFile = new File(targetDir, lb.getBlock().getBlockName()); + if (blockFile.exists()) { // Found a persisted copy for this block! boolean added = persistedBlockIds.add(lb.getBlock().getBlockId()); assertThat(added, is(true));