HDFS-6930. Improve replica eviction from RAM disk. (Arpit Agarwal)

This commit is contained in:
arp 2014-09-03 13:53:01 -07:00
parent 08a8f2be81
commit cb9b485075
3 changed files with 139 additions and 129 deletions

View File

@ -33,3 +33,5 @@
HDFS-6950. Add Additional unit tests for HDFS-6581. (Xiaoyu Yao via HDFS-6950. Add Additional unit tests for HDFS-6581. (Xiaoyu Yao via
Arpit Agarwal) Arpit Agarwal)
HDFS-6930. Improve replica eviction from RAM disk. (Arpit Agarwal)

View File

@ -909,83 +909,6 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
} }
} }
/**
* Attempt to evict one or more transient block replicas we have at least
* spaceNeeded bytes free.
*
* @return true if we were able to free up at least spaceNeeded bytes, false
* otherwise.
*/
private boolean tryToEvictBlocks(final String bpid, final long spaceNeeded)
throws IOException {
boolean isAvailable = false;
LOG.info("Attempting to evict blocks from transient storage");
// Reverse the map so we can iterate in order of replica creation times,
// evicting oldest replicas one at a time until we have sufficient space.
TreeMultimap<Long, LazyWriteReplicaTracker.ReplicaState> lruMap =
lazyWriteReplicaTracker.getLruMap();
int blocksEvicted = 0;
// TODO: It is really inefficient to do this with the Object lock held!
// TODO: This logic is here just for prototyping.
// TODO: We should replace it with proactive discard when ram_disk free space
// TODO: falls below a low watermark. That way we avoid fs operations on the
// TODO: hot path with the lock held.
synchronized (this) {
long currentTime = System.currentTimeMillis() / 1000;
for (Map.Entry<Long, LazyWriteReplicaTracker.ReplicaState> entry : lruMap.entries()) {
LazyWriteReplicaTracker.ReplicaState lazyWriteReplica = entry.getValue();
LOG.info("RAM_DISK: Evicting blockId=" + lazyWriteReplica.blockId +
"; block LMT=" + entry.getKey() +
"; currentTime=" + currentTime);
ReplicaInfo replicaInfo = getReplicaInfo(bpid, lazyWriteReplica.blockId);
Preconditions.checkState(replicaInfo.getVolume().isTransientStorage());
File blockFile = replicaInfo.getBlockFile();
File metaFile = replicaInfo.getMetaFile();
long used = blockFile.length() + metaFile.length();
lazyWriteReplicaTracker.discardReplica(bpid, entry.getValue().blockId, false);
// Move the persisted replica to the finalized directory of
// the target volume.
BlockPoolSlice bpSlice =
lazyWriteReplica.lazyPersistVolume.getBlockPoolSlice(bpid);
File newBlockFile = bpSlice.activateSavedReplica(
replicaInfo, lazyWriteReplica.savedBlockFile);
ReplicaInfo newReplicaInfo =
new FinalizedReplica(replicaInfo.getBlockId(),
replicaInfo.getBytesOnDisk(),
replicaInfo.getGenerationStamp(),
lazyWriteReplica.lazyPersistVolume,
newBlockFile.getParentFile());
// Update the volumeMap entry. This removes the old entry.
volumeMap.add(bpid, newReplicaInfo);
// Remove the old replicas.
blockFile.delete();
metaFile.delete();
((FsVolumeImpl) replicaInfo.getVolume()).decDfsUsed(bpid, used);
++blocksEvicted;
if (replicaInfo.getVolume().getAvailable() > spaceNeeded) {
LOG.info("RAM_DISK: freed up " + spaceNeeded + " bytes for new block");
isAvailable = true;
break;
}
if (blocksEvicted == MAX_BLOCK_EVICTIONS_PER_ITERATION) {
break;
}
}
}
return isAvailable;
}
@Override // FsDatasetSpi @Override // FsDatasetSpi
public synchronized ReplicaInPipeline createRbw(StorageType storageType, public synchronized ReplicaInPipeline createRbw(StorageType storageType,
ExtendedBlock b, boolean allowLazyPersist) throws IOException { ExtendedBlock b, boolean allowLazyPersist) throws IOException {
@ -1008,13 +931,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
} }
} catch (DiskOutOfSpaceException de) { } catch (DiskOutOfSpaceException de) {
if (allowLazyPersist) { if (allowLazyPersist) {
if (!tryToEvictBlocks(b.getBlockPoolId(), b.getNumBytes())) {
// Eviction did not work, we'll just fallback to DEFAULT storage.
LOG.info("RAM_DISK: Failed to free up " + b.getNumBytes() +
" bytes for new block. Will fallback to DEFAULT " +
"storage");
allowLazyPersist = false; allowLazyPersist = false;
}
continue; continue;
} }
throw de; throw de;
@ -2267,20 +2184,26 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
class LazyWriter implements Runnable { class LazyWriter implements Runnable {
private volatile boolean shouldRun = true; private volatile boolean shouldRun = true;
final int checkpointerInterval; final int checkpointerInterval;
final long estimateBlockSize;
public static final int LOW_WATERMARK_FREE_SPACE_PERCENT = 10;
public static final int LOW_WATERMARK_FREE_SPACE_REPLICAS = 3;
public LazyWriter(final int checkpointerInterval) { public LazyWriter(final int checkpointerInterval) {
this.checkpointerInterval = checkpointerInterval; this.checkpointerInterval = checkpointerInterval;
this.estimateBlockSize = conf.getLongBytes(
DFSConfigKeys.DFS_BLOCK_SIZE_KEY,
DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT);
} }
private void moveReplicaToNewVolume(String bpid, long blockId) private void moveReplicaToNewVolume(String bpid, long blockId)
throws IOException { throws IOException {
LOG.info("LazyWriter invoked to save blockId=" + blockId + "; bpid=" + bpid);
FsVolumeImpl targetVolume; FsVolumeImpl targetVolume;
ReplicaInfo replicaInfo; ReplicaInfo replicaInfo;
synchronized (this) { synchronized (FsDatasetImpl.this) {
replicaInfo = volumeMap.get(bpid, blockId); replicaInfo = volumeMap.get(bpid, blockId);
if (replicaInfo == null || !replicaInfo.getVolume().isTransientStorage()) { if (replicaInfo == null || !replicaInfo.getVolume().isTransientStorage()) {
@ -2294,7 +2217,6 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
// Pick a target volume for the block. // Pick a target volume for the block.
targetVolume = volumes.getNextVolume( targetVolume = volumes.getNextVolume(
StorageType.DEFAULT, replicaInfo.getNumBytes()); StorageType.DEFAULT, replicaInfo.getNumBytes());
}
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("LazyWriter starting to save blockId=" + blockId + "; bpid=" + bpid); LOG.debug("LazyWriter starting to save blockId=" + blockId + "; bpid=" + bpid);
@ -2310,6 +2232,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
" to file " + savedBlockFile); " to file " + savedBlockFile);
} }
} }
}
/** /**
* Checkpoint a pending replica to persistent storage now. * Checkpoint a pending replica to persistent storage now.
@ -2321,28 +2244,100 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
boolean succeeded = false; boolean succeeded = false;
try { try {
synchronized (this) {
replicaState = lazyWriteReplicaTracker.dequeueNextReplicaToPersist(); replicaState = lazyWriteReplicaTracker.dequeueNextReplicaToPersist();
if (replicaState == null) { if (replicaState != null) {
return false;
}
}
// Move the replica outside the lock. // Move the replica outside the lock.
moveReplicaToNewVolume(replicaState.bpid, replicaState.blockId); moveReplicaToNewVolume(replicaState.bpid, replicaState.blockId);
}
succeeded = true; succeeded = true;
} catch(IOException ioe) { } catch(IOException ioe) {
LOG.warn("Exception saving replica " + replicaState, ioe); LOG.warn("Exception saving replica " + replicaState, ioe);
} finally { } finally {
if (!succeeded && replicaState != null) { if (!succeeded && replicaState != null) {
LOG.warn("Failed to save replica " + replicaState + ". re-enqueueing it."); LOG.warn("Failed to save replica " + replicaState + ". re-enqueueing it.");
lazyWriteReplicaTracker.reenqueueReplica(replicaState); lazyWriteReplicaTracker.reenqueueReplicaNotPersisted(replicaState);
} }
} }
return succeeded; return succeeded;
} }
private boolean transientFreeSpaceBelowThreshold() throws IOException {
long free = 0;
long capacity = 0;
// Don't worry about fragmentation for now. We don't expect more than one
// transient volume per DN.
for (FsVolumeImpl v : volumes.volumes) {
if (v.isTransientStorage()) {
capacity += v.getCapacity();
free += v.getAvailable();
}
}
if (capacity == 0) {
return false;
}
int percentFree = (int) (free * 100 / capacity);
return percentFree < LOW_WATERMARK_FREE_SPACE_PERCENT ||
free < (estimateBlockSize * LOW_WATERMARK_FREE_SPACE_REPLICAS);
}
/**
* Attempt to evict one or more transient block replicas we have at least
* spaceNeeded bytes free.
*/
private synchronized void evictBlocks() throws IOException {
int iterations = 0;
LazyWriteReplicaTracker.ReplicaState replicaState =
lazyWriteReplicaTracker.getNextCandidateForEviction();
while (replicaState != null &&
iterations++ < MAX_BLOCK_EVICTIONS_PER_ITERATION &
transientFreeSpaceBelowThreshold()) {
if (LOG.isDebugEnabled()) {
LOG.info("Evicting block " + replicaState);
}
ReplicaInfo replicaInfo = getReplicaInfo(replicaState.bpid, replicaState.blockId);
Preconditions.checkState(replicaInfo.getVolume().isTransientStorage());
File blockFile = replicaInfo.getBlockFile();
File metaFile = replicaInfo.getMetaFile();
long blockFileUsed = blockFile.length();
long metaFileUsed = metaFile.length();
lazyWriteReplicaTracker.discardReplica(replicaState, false);
// Move the replica from lazyPersist/ to finalized/ on target volume
BlockPoolSlice bpSlice =
replicaState.lazyPersistVolume.getBlockPoolSlice(replicaState.bpid);
File newBlockFile = bpSlice.activateSavedReplica(
replicaInfo, replicaState.savedBlockFile);
ReplicaInfo newReplicaInfo =
new FinalizedReplica(replicaInfo.getBlockId(),
replicaInfo.getBytesOnDisk(),
replicaInfo.getGenerationStamp(),
replicaState.lazyPersistVolume,
newBlockFile.getParentFile());
// Update the volumeMap entry. This removes the old entry.
volumeMap.add(replicaState.bpid, newReplicaInfo);
// Remove the old replicas from transient storage.
if (blockFile.delete() || !blockFile.exists()) {
((FsVolumeImpl) replicaInfo.getVolume()).decDfsUsed(replicaState.bpid, blockFileUsed);
}
if (metaFile.delete() || !metaFile.exists()) {
((FsVolumeImpl) replicaInfo.getVolume()).decDfsUsed(replicaState.bpid, metaFileUsed);
}
// If deletion failed then the directory scanner will cleanup the blocks
// eventually.
replicaState = lazyWriteReplicaTracker.getNextCandidateForEviction();
}
}
@Override @Override
public void run() { public void run() {
int numSuccessiveFailures = 0; int numSuccessiveFailures = 0;
@ -2350,11 +2345,12 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
while (fsRunning && shouldRun) { while (fsRunning && shouldRun) {
try { try {
numSuccessiveFailures = saveNextReplica() ? 0 : (numSuccessiveFailures + 1); numSuccessiveFailures = saveNextReplica() ? 0 : (numSuccessiveFailures + 1);
evictBlocks();
// Sleep if we have no more work to do or if it looks like we are not // Sleep if we have no more work to do or if it looks like we are not
// making any forward progress. This is to ensure that if all persist // making any forward progress. This is to ensure that if all persist
// operations are failing we don't keep retrying them in a tight loop. // operations are failing we don't keep retrying them in a tight loop.
if (numSuccessiveFailures == lazyWriteReplicaTracker.numReplicasNotPersisted()) { if (numSuccessiveFailures >= lazyWriteReplicaTracker.numReplicasNotPersisted()) {
Thread.sleep(checkpointerInterval * 1000); Thread.sleep(checkpointerInterval * 1000);
numSuccessiveFailures = 0; numSuccessiveFailures = 0;
} }
@ -2362,7 +2358,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
LOG.info("LazyWriter was interrupted, exiting"); LOG.info("LazyWriter was interrupted, exiting");
break; break;
} catch (Exception e) { } catch (Exception e) {
LOG.error("Ignoring exception in LazyWriter:", e); LOG.warn("Ignoring exception in LazyWriter:", e);
} }
} }
} }

View File

@ -104,30 +104,23 @@ class LazyWriteReplicaTracker {
/** /**
* Queue of replicas that need to be written to disk. * Queue of replicas that need to be written to disk.
* Stale entries are GC'd by dequeueNextReplicaToPersist.
*/ */
final Queue<ReplicaState> replicasNotPersisted; final Queue<ReplicaState> replicasNotPersisted;
/** /**
* A map of blockId to persist complete time for transient blocks. This allows * Queue of replicas in the order in which they were persisted.
* us to evict LRU blocks from transient storage. Protected by 'this' * We'll dequeue them in the same order.
* Object lock. * We can improve the eviction scheme later.
* Stale entries are GC'd by getNextCandidateForEviction.
*/ */
final Map<ReplicaState, Long> replicasPersisted; final Queue<ReplicaState> replicasPersisted;
LazyWriteReplicaTracker(final FsDatasetImpl fsDataset) { LazyWriteReplicaTracker(final FsDatasetImpl fsDataset) {
this.fsDataset = fsDataset; this.fsDataset = fsDataset;
replicaMaps = new HashMap<String, Map<Long, ReplicaState>>(); replicaMaps = new HashMap<String, Map<Long, ReplicaState>>();
replicasNotPersisted = new LinkedList<ReplicaState>(); replicasNotPersisted = new LinkedList<ReplicaState>();
replicasPersisted = new HashMap<ReplicaState, Long>(); replicasPersisted = new LinkedList<ReplicaState>();
}
TreeMultimap<Long, ReplicaState> getLruMap() {
// TODO: This can be made more efficient.
TreeMultimap<Long, ReplicaState> reversedMap = TreeMultimap.create();
for (Map.Entry<ReplicaState, Long> entry : replicasPersisted.entrySet()) {
reversedMap.put(entry.getValue(), entry.getKey());
}
return reversedMap;
} }
synchronized void addReplica(String bpid, long blockId, synchronized void addReplica(String bpid, long blockId,
@ -171,7 +164,8 @@ class LazyWriteReplicaTracker {
// one. // one.
replicasNotPersisted.remove(replicaState); replicasNotPersisted.remove(replicaState);
} }
replicasPersisted.put(replicaState, System.currentTimeMillis() / 1000);
replicasPersisted.add(replicaState);
} }
synchronized ReplicaState dequeueNextReplicaToPersist() { synchronized ReplicaState dequeueNextReplicaToPersist() {
@ -188,14 +182,36 @@ class LazyWriteReplicaTracker {
return null; return null;
} }
synchronized void reenqueueReplica(final ReplicaState replicaState) { synchronized void reenqueueReplicaNotPersisted(final ReplicaState replicaState) {
replicasNotPersisted.add(replicaState); replicasNotPersisted.add(replicaState);
} }
synchronized void reenqueueReplicaPersisted(final ReplicaState replicaState) {
replicasPersisted.add(replicaState);
}
synchronized int numReplicasNotPersisted() { synchronized int numReplicasNotPersisted() {
return replicasNotPersisted.size(); return replicasNotPersisted.size();
} }
synchronized ReplicaState getNextCandidateForEviction() {
while (replicasPersisted.size() != 0) {
ReplicaState replicaState = replicasPersisted.remove();
Map<Long, ReplicaState> replicaMap = replicaMaps.get(replicaState.bpid);
if (replicaMap != null && replicaMap.get(replicaState.blockId) != null) {
return replicaState;
}
// The replica no longer exists, look for the next one.
}
return null;
}
void discardReplica(ReplicaState replicaState, boolean force) {
discardReplica(replicaState.bpid, replicaState.blockId, force);
}
synchronized void discardReplica( synchronized void discardReplica(
final String bpid, final long blockId, boolean force) { final String bpid, final long blockId, boolean force) {
Map<Long, ReplicaState> map = replicaMaps.get(bpid); Map<Long, ReplicaState> map = replicaMaps.get(bpid);
@ -221,9 +237,5 @@ class LazyWriteReplicaTracker {
} }
map.remove(blockId); map.remove(blockId);
replicasPersisted.remove(replicaState);
// Leave the replica in replicasNotPersisted if its present.
// dequeueNextReplicaToPersist will GC it eventually.
} }
} }