From 34f28eaff9c4386dbfffdb9fc7640c32dae01eb6 Mon Sep 17 00:00:00 2001 From: arp Date: Wed, 3 Sep 2014 13:53:01 -0700 Subject: [PATCH] HDFS-6930. Improve replica eviction from RAM disk. (Arpit Agarwal) Conflicts: hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-6581.txt --- .../fsdataset/impl/FsDatasetImpl.java | 214 +++++++++--------- .../impl/LazyWriteReplicaTracker.java | 52 +++-- 2 files changed, 137 insertions(+), 129 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java index 797dcc647a4..8d93dccc686 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java @@ -996,83 +996,6 @@ class FsDatasetImpl implements FsDatasetSpi { } } - /** - * Attempt to evict one or more transient block replicas we have at least - * spaceNeeded bytes free. - * - * @return true if we were able to free up at least spaceNeeded bytes, false - * otherwise. - */ - private boolean tryToEvictBlocks(final String bpid, final long spaceNeeded) - throws IOException { - - boolean isAvailable = false; - - LOG.info("Attempting to evict blocks from transient storage"); - - // Reverse the map so we can iterate in order of replica creation times, - // evicting oldest replicas one at a time until we have sufficient space. - TreeMultimap lruMap = - lazyWriteReplicaTracker.getLruMap(); - int blocksEvicted = 0; - - // TODO: It is really inefficient to do this with the Object lock held! - // TODO: This logic is here just for prototyping. - // TODO: We should replace it with proactive discard when ram_disk free space - // TODO: falls below a low watermark. That way we avoid fs operations on the - // TODO: hot path with the lock held. - synchronized (this) { - long currentTime = System.currentTimeMillis() / 1000; - for (Map.Entry entry : lruMap.entries()) { - LazyWriteReplicaTracker.ReplicaState lazyWriteReplica = entry.getValue(); - LOG.info("RAM_DISK: Evicting blockId=" + lazyWriteReplica.blockId + - "; block LMT=" + entry.getKey() + - "; currentTime=" + currentTime); - ReplicaInfo replicaInfo = getReplicaInfo(bpid, lazyWriteReplica.blockId); - Preconditions.checkState(replicaInfo.getVolume().isTransientStorage()); - File blockFile = replicaInfo.getBlockFile(); - File metaFile = replicaInfo.getMetaFile(); - long used = blockFile.length() + metaFile.length(); - lazyWriteReplicaTracker.discardReplica(bpid, entry.getValue().blockId, false); - - // Move the persisted replica to the finalized directory of - // the target volume. - BlockPoolSlice bpSlice = - lazyWriteReplica.lazyPersistVolume.getBlockPoolSlice(bpid); - File newBlockFile = bpSlice.activateSavedReplica( - replicaInfo, lazyWriteReplica.savedBlockFile); - - ReplicaInfo newReplicaInfo = - new FinalizedReplica(replicaInfo.getBlockId(), - replicaInfo.getBytesOnDisk(), - replicaInfo.getGenerationStamp(), - lazyWriteReplica.lazyPersistVolume, - newBlockFile.getParentFile()); - - // Update the volumeMap entry. This removes the old entry. - volumeMap.add(bpid, newReplicaInfo); - - // Remove the old replicas. - blockFile.delete(); - metaFile.delete(); - ((FsVolumeImpl) replicaInfo.getVolume()).decDfsUsed(bpid, used); - ++blocksEvicted; - - if (replicaInfo.getVolume().getAvailable() > spaceNeeded) { - LOG.info("RAM_DISK: freed up " + spaceNeeded + " bytes for new block"); - isAvailable = true; - break; - } - - if (blocksEvicted == MAX_BLOCK_EVICTIONS_PER_ITERATION) { - break; - } - } - } - - return isAvailable; - } - @Override // FsDatasetSpi public synchronized ReplicaInPipeline createRbw(StorageType storageType, ExtendedBlock b, boolean allowLazyPersist) throws IOException { @@ -1095,13 +1018,7 @@ class FsDatasetImpl implements FsDatasetSpi { } } catch (DiskOutOfSpaceException de) { if (allowLazyPersist) { - if (!tryToEvictBlocks(b.getBlockPoolId(), b.getNumBytes())) { - // Eviction did not work, we'll just fallback to DEFAULT storage. - LOG.info("RAM_DISK: Failed to free up " + b.getNumBytes() + - " bytes for new block. Will fallback to DEFAULT " + - "storage"); - allowLazyPersist = false; - } + allowLazyPersist = false; continue; } throw de; @@ -2376,20 +2293,26 @@ class FsDatasetImpl implements FsDatasetSpi { class LazyWriter implements Runnable { private volatile boolean shouldRun = true; final int checkpointerInterval; + final long estimateBlockSize; + + public static final int LOW_WATERMARK_FREE_SPACE_PERCENT = 10; + public static final int LOW_WATERMARK_FREE_SPACE_REPLICAS = 3; + public LazyWriter(final int checkpointerInterval) { this.checkpointerInterval = checkpointerInterval; + this.estimateBlockSize = conf.getLongBytes( + DFSConfigKeys.DFS_BLOCK_SIZE_KEY, + DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT); } private void moveReplicaToNewVolume(String bpid, long blockId) throws IOException { - LOG.info("LazyWriter invoked to save blockId=" + blockId + "; bpid=" + bpid); - FsVolumeImpl targetVolume; ReplicaInfo replicaInfo; - synchronized (this) { + synchronized (FsDatasetImpl.this) { replicaInfo = volumeMap.get(bpid, blockId); if (replicaInfo == null || !replicaInfo.getVolume().isTransientStorage()) { @@ -2403,20 +2326,20 @@ class FsDatasetImpl implements FsDatasetSpi { // Pick a target volume for the block. targetVolume = volumes.getNextVolume( StorageType.DEFAULT, replicaInfo.getNumBytes()); - } - if (LOG.isDebugEnabled()) { - LOG.debug("LazyWriter starting to save blockId=" + blockId + "; bpid=" + bpid); - } + if (LOG.isDebugEnabled()) { + LOG.debug("LazyWriter starting to save blockId=" + blockId + "; bpid=" + bpid); + } - lazyWriteReplicaTracker.recordStartLazyPersist(bpid, blockId, targetVolume); - File savedBlockFile = targetVolume.getBlockPoolSlice(bpid) - .lazyPersistReplica(replicaInfo); - lazyWriteReplicaTracker.recordEndLazyPersist(bpid, blockId, savedBlockFile); + lazyWriteReplicaTracker.recordStartLazyPersist(bpid, blockId, targetVolume); + File savedBlockFile = targetVolume.getBlockPoolSlice(bpid) + .lazyPersistReplica(replicaInfo); + lazyWriteReplicaTracker.recordEndLazyPersist(bpid, blockId, savedBlockFile); - if (LOG.isDebugEnabled()) { - LOG.debug("LazyWriter finished saving blockId=" + blockId + "; bpid=" + bpid + - " to file " + savedBlockFile); + if (LOG.isDebugEnabled()) { + LOG.debug("LazyWriter finished saving blockId=" + blockId + "; bpid=" + bpid + + " to file " + savedBlockFile); + } } } @@ -2430,28 +2353,100 @@ class FsDatasetImpl implements FsDatasetSpi { boolean succeeded = false; try { - synchronized (this) { - replicaState = lazyWriteReplicaTracker.dequeueNextReplicaToPersist(); - if (replicaState == null) { - return false; - } + replicaState = lazyWriteReplicaTracker.dequeueNextReplicaToPersist(); + if (replicaState != null) { + // Move the replica outside the lock. + moveReplicaToNewVolume(replicaState.bpid, replicaState.blockId); } - - // Move the replica outside the lock. - moveReplicaToNewVolume(replicaState.bpid, replicaState.blockId); succeeded = true; } catch(IOException ioe) { LOG.warn("Exception saving replica " + replicaState, ioe); } finally { if (!succeeded && replicaState != null) { LOG.warn("Failed to save replica " + replicaState + ". re-enqueueing it."); - lazyWriteReplicaTracker.reenqueueReplica(replicaState); + lazyWriteReplicaTracker.reenqueueReplicaNotPersisted(replicaState); } } return succeeded; } + private boolean transientFreeSpaceBelowThreshold() throws IOException { + long free = 0; + long capacity = 0; + + // Don't worry about fragmentation for now. We don't expect more than one + // transient volume per DN. + for (FsVolumeImpl v : volumes.volumes) { + if (v.isTransientStorage()) { + capacity += v.getCapacity(); + free += v.getAvailable(); + } + } + + if (capacity == 0) { + return false; + } + + int percentFree = (int) (free * 100 / capacity); + return percentFree < LOW_WATERMARK_FREE_SPACE_PERCENT || + free < (estimateBlockSize * LOW_WATERMARK_FREE_SPACE_REPLICAS); + } + + /** + * Attempt to evict one or more transient block replicas we have at least + * spaceNeeded bytes free. + */ + private synchronized void evictBlocks() throws IOException { + int iterations = 0; + + LazyWriteReplicaTracker.ReplicaState replicaState = + lazyWriteReplicaTracker.getNextCandidateForEviction(); + + while (replicaState != null && + iterations++ < MAX_BLOCK_EVICTIONS_PER_ITERATION & + transientFreeSpaceBelowThreshold()) { + if (LOG.isDebugEnabled()) { + LOG.info("Evicting block " + replicaState); + } + ReplicaInfo replicaInfo = getReplicaInfo(replicaState.bpid, replicaState.blockId); + Preconditions.checkState(replicaInfo.getVolume().isTransientStorage()); + File blockFile = replicaInfo.getBlockFile(); + File metaFile = replicaInfo.getMetaFile(); + long blockFileUsed = blockFile.length(); + long metaFileUsed = metaFile.length(); + lazyWriteReplicaTracker.discardReplica(replicaState, false); + + // Move the replica from lazyPersist/ to finalized/ on target volume + BlockPoolSlice bpSlice = + replicaState.lazyPersistVolume.getBlockPoolSlice(replicaState.bpid); + File newBlockFile = bpSlice.activateSavedReplica( + replicaInfo, replicaState.savedBlockFile); + + ReplicaInfo newReplicaInfo = + new FinalizedReplica(replicaInfo.getBlockId(), + replicaInfo.getBytesOnDisk(), + replicaInfo.getGenerationStamp(), + replicaState.lazyPersistVolume, + newBlockFile.getParentFile()); + + // Update the volumeMap entry. This removes the old entry. + volumeMap.add(replicaState.bpid, newReplicaInfo); + + // Remove the old replicas from transient storage. + if (blockFile.delete() || !blockFile.exists()) { + ((FsVolumeImpl) replicaInfo.getVolume()).decDfsUsed(replicaState.bpid, blockFileUsed); + } + if (metaFile.delete() || !metaFile.exists()) { + ((FsVolumeImpl) replicaInfo.getVolume()).decDfsUsed(replicaState.bpid, metaFileUsed); + } + + // If deletion failed then the directory scanner will cleanup the blocks + // eventually. + replicaState = lazyWriteReplicaTracker.getNextCandidateForEviction(); + } + } + @Override public void run() { int numSuccessiveFailures = 0; @@ -2459,11 +2454,12 @@ class FsDatasetImpl implements FsDatasetSpi { while (fsRunning && shouldRun) { try { numSuccessiveFailures = saveNextReplica() ? 0 : (numSuccessiveFailures + 1); + evictBlocks(); // Sleep if we have no more work to do or if it looks like we are not // making any forward progress. This is to ensure that if all persist // operations are failing we don't keep retrying them in a tight loop. - if (numSuccessiveFailures == lazyWriteReplicaTracker.numReplicasNotPersisted()) { + if (numSuccessiveFailures >= lazyWriteReplicaTracker.numReplicasNotPersisted()) { Thread.sleep(checkpointerInterval * 1000); numSuccessiveFailures = 0; } @@ -2471,7 +2467,7 @@ class FsDatasetImpl implements FsDatasetSpi { LOG.info("LazyWriter was interrupted, exiting"); break; } catch (Exception e) { - LOG.error("Ignoring exception in LazyWriter:", e); + LOG.warn("Ignoring exception in LazyWriter:", e); } } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/LazyWriteReplicaTracker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/LazyWriteReplicaTracker.java index 222b63aad7f..9f020c4ab9b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/LazyWriteReplicaTracker.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/LazyWriteReplicaTracker.java @@ -104,30 +104,23 @@ class LazyWriteReplicaTracker { /** * Queue of replicas that need to be written to disk. + * Stale entries are GC'd by dequeueNextReplicaToPersist. */ final Queue replicasNotPersisted; /** - * A map of blockId to persist complete time for transient blocks. This allows - * us to evict LRU blocks from transient storage. Protected by 'this' - * Object lock. + * Queue of replicas in the order in which they were persisted. + * We'll dequeue them in the same order. + * We can improve the eviction scheme later. + * Stale entries are GC'd by getNextCandidateForEviction. */ - final Map replicasPersisted; + final Queue replicasPersisted; LazyWriteReplicaTracker(final FsDatasetImpl fsDataset) { this.fsDataset = fsDataset; replicaMaps = new HashMap>(); replicasNotPersisted = new LinkedList(); - replicasPersisted = new HashMap(); - } - - TreeMultimap getLruMap() { - // TODO: This can be made more efficient. - TreeMultimap reversedMap = TreeMultimap.create(); - for (Map.Entry entry : replicasPersisted.entrySet()) { - reversedMap.put(entry.getValue(), entry.getKey()); - } - return reversedMap; + replicasPersisted = new LinkedList(); } synchronized void addReplica(String bpid, long blockId, @@ -171,7 +164,8 @@ class LazyWriteReplicaTracker { // one. replicasNotPersisted.remove(replicaState); } - replicasPersisted.put(replicaState, System.currentTimeMillis() / 1000); + + replicasPersisted.add(replicaState); } synchronized ReplicaState dequeueNextReplicaToPersist() { @@ -188,14 +182,36 @@ class LazyWriteReplicaTracker { return null; } - synchronized void reenqueueReplica(final ReplicaState replicaState) { + synchronized void reenqueueReplicaNotPersisted(final ReplicaState replicaState) { replicasNotPersisted.add(replicaState); } + synchronized void reenqueueReplicaPersisted(final ReplicaState replicaState) { + replicasPersisted.add(replicaState); + } + synchronized int numReplicasNotPersisted() { return replicasNotPersisted.size(); } + synchronized ReplicaState getNextCandidateForEviction() { + while (replicasPersisted.size() != 0) { + ReplicaState replicaState = replicasPersisted.remove(); + Map replicaMap = replicaMaps.get(replicaState.bpid); + + if (replicaMap != null && replicaMap.get(replicaState.blockId) != null) { + return replicaState; + } + + // The replica no longer exists, look for the next one. + } + return null; + } + + void discardReplica(ReplicaState replicaState, boolean force) { + discardReplica(replicaState.bpid, replicaState.blockId, force); + } + synchronized void discardReplica( final String bpid, final long blockId, boolean force) { Map map = replicaMaps.get(bpid); @@ -221,9 +237,5 @@ class LazyWriteReplicaTracker { } map.remove(blockId); - replicasPersisted.remove(replicaState); - - // Leave the replica in replicasNotPersisted if its present. - // dequeueNextReplicaToPersist will GC it eventually. } }