HDFS-6930. Improve replica eviction from RAM disk. (Arpit Agarwal)
Conflicts: hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-6581.txt
This commit is contained in:
parent
225ffdb6d8
commit
21046d8310
|
@ -996,83 +996,6 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Attempt to evict one or more transient block replicas we have at least
|
|
||||||
* spaceNeeded bytes free.
|
|
||||||
*
|
|
||||||
* @return true if we were able to free up at least spaceNeeded bytes, false
|
|
||||||
* otherwise.
|
|
||||||
*/
|
|
||||||
private boolean tryToEvictBlocks(final String bpid, final long spaceNeeded)
|
|
||||||
throws IOException {
|
|
||||||
|
|
||||||
boolean isAvailable = false;
|
|
||||||
|
|
||||||
LOG.info("Attempting to evict blocks from transient storage");
|
|
||||||
|
|
||||||
// Reverse the map so we can iterate in order of replica creation times,
|
|
||||||
// evicting oldest replicas one at a time until we have sufficient space.
|
|
||||||
TreeMultimap<Long, LazyWriteReplicaTracker.ReplicaState> lruMap =
|
|
||||||
lazyWriteReplicaTracker.getLruMap();
|
|
||||||
int blocksEvicted = 0;
|
|
||||||
|
|
||||||
// TODO: It is really inefficient to do this with the Object lock held!
|
|
||||||
// TODO: This logic is here just for prototyping.
|
|
||||||
// TODO: We should replace it with proactive discard when ram_disk free space
|
|
||||||
// TODO: falls below a low watermark. That way we avoid fs operations on the
|
|
||||||
// TODO: hot path with the lock held.
|
|
||||||
synchronized (this) {
|
|
||||||
long currentTime = System.currentTimeMillis() / 1000;
|
|
||||||
for (Map.Entry<Long, LazyWriteReplicaTracker.ReplicaState> entry : lruMap.entries()) {
|
|
||||||
LazyWriteReplicaTracker.ReplicaState lazyWriteReplica = entry.getValue();
|
|
||||||
LOG.info("RAM_DISK: Evicting blockId=" + lazyWriteReplica.blockId +
|
|
||||||
"; block LMT=" + entry.getKey() +
|
|
||||||
"; currentTime=" + currentTime);
|
|
||||||
ReplicaInfo replicaInfo = getReplicaInfo(bpid, lazyWriteReplica.blockId);
|
|
||||||
Preconditions.checkState(replicaInfo.getVolume().isTransientStorage());
|
|
||||||
File blockFile = replicaInfo.getBlockFile();
|
|
||||||
File metaFile = replicaInfo.getMetaFile();
|
|
||||||
long used = blockFile.length() + metaFile.length();
|
|
||||||
lazyWriteReplicaTracker.discardReplica(bpid, entry.getValue().blockId, false);
|
|
||||||
|
|
||||||
// Move the persisted replica to the finalized directory of
|
|
||||||
// the target volume.
|
|
||||||
BlockPoolSlice bpSlice =
|
|
||||||
lazyWriteReplica.lazyPersistVolume.getBlockPoolSlice(bpid);
|
|
||||||
File newBlockFile = bpSlice.activateSavedReplica(
|
|
||||||
replicaInfo, lazyWriteReplica.savedBlockFile);
|
|
||||||
|
|
||||||
ReplicaInfo newReplicaInfo =
|
|
||||||
new FinalizedReplica(replicaInfo.getBlockId(),
|
|
||||||
replicaInfo.getBytesOnDisk(),
|
|
||||||
replicaInfo.getGenerationStamp(),
|
|
||||||
lazyWriteReplica.lazyPersistVolume,
|
|
||||||
newBlockFile.getParentFile());
|
|
||||||
|
|
||||||
// Update the volumeMap entry. This removes the old entry.
|
|
||||||
volumeMap.add(bpid, newReplicaInfo);
|
|
||||||
|
|
||||||
// Remove the old replicas.
|
|
||||||
blockFile.delete();
|
|
||||||
metaFile.delete();
|
|
||||||
((FsVolumeImpl) replicaInfo.getVolume()).decDfsUsed(bpid, used);
|
|
||||||
++blocksEvicted;
|
|
||||||
|
|
||||||
if (replicaInfo.getVolume().getAvailable() > spaceNeeded) {
|
|
||||||
LOG.info("RAM_DISK: freed up " + spaceNeeded + " bytes for new block");
|
|
||||||
isAvailable = true;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (blocksEvicted == MAX_BLOCK_EVICTIONS_PER_ITERATION) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return isAvailable;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override // FsDatasetSpi
|
@Override // FsDatasetSpi
|
||||||
public synchronized ReplicaInPipeline createRbw(StorageType storageType,
|
public synchronized ReplicaInPipeline createRbw(StorageType storageType,
|
||||||
ExtendedBlock b, boolean allowLazyPersist) throws IOException {
|
ExtendedBlock b, boolean allowLazyPersist) throws IOException {
|
||||||
|
@ -1095,13 +1018,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
||||||
}
|
}
|
||||||
} catch (DiskOutOfSpaceException de) {
|
} catch (DiskOutOfSpaceException de) {
|
||||||
if (allowLazyPersist) {
|
if (allowLazyPersist) {
|
||||||
if (!tryToEvictBlocks(b.getBlockPoolId(), b.getNumBytes())) {
|
|
||||||
// Eviction did not work, we'll just fallback to DEFAULT storage.
|
|
||||||
LOG.info("RAM_DISK: Failed to free up " + b.getNumBytes() +
|
|
||||||
" bytes for new block. Will fallback to DEFAULT " +
|
|
||||||
"storage");
|
|
||||||
allowLazyPersist = false;
|
allowLazyPersist = false;
|
||||||
}
|
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
throw de;
|
throw de;
|
||||||
|
@ -2376,20 +2293,26 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
||||||
class LazyWriter implements Runnable {
|
class LazyWriter implements Runnable {
|
||||||
private volatile boolean shouldRun = true;
|
private volatile boolean shouldRun = true;
|
||||||
final int checkpointerInterval;
|
final int checkpointerInterval;
|
||||||
|
final long estimateBlockSize;
|
||||||
|
|
||||||
|
public static final int LOW_WATERMARK_FREE_SPACE_PERCENT = 10;
|
||||||
|
public static final int LOW_WATERMARK_FREE_SPACE_REPLICAS = 3;
|
||||||
|
|
||||||
|
|
||||||
public LazyWriter(final int checkpointerInterval) {
|
public LazyWriter(final int checkpointerInterval) {
|
||||||
this.checkpointerInterval = checkpointerInterval;
|
this.checkpointerInterval = checkpointerInterval;
|
||||||
|
this.estimateBlockSize = conf.getLongBytes(
|
||||||
|
DFSConfigKeys.DFS_BLOCK_SIZE_KEY,
|
||||||
|
DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void moveReplicaToNewVolume(String bpid, long blockId)
|
private void moveReplicaToNewVolume(String bpid, long blockId)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
|
|
||||||
LOG.info("LazyWriter invoked to save blockId=" + blockId + "; bpid=" + bpid);
|
|
||||||
|
|
||||||
FsVolumeImpl targetVolume;
|
FsVolumeImpl targetVolume;
|
||||||
ReplicaInfo replicaInfo;
|
ReplicaInfo replicaInfo;
|
||||||
|
|
||||||
synchronized (this) {
|
synchronized (FsDatasetImpl.this) {
|
||||||
replicaInfo = volumeMap.get(bpid, blockId);
|
replicaInfo = volumeMap.get(bpid, blockId);
|
||||||
|
|
||||||
if (replicaInfo == null || !replicaInfo.getVolume().isTransientStorage()) {
|
if (replicaInfo == null || !replicaInfo.getVolume().isTransientStorage()) {
|
||||||
|
@ -2403,7 +2326,6 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
||||||
// Pick a target volume for the block.
|
// Pick a target volume for the block.
|
||||||
targetVolume = volumes.getNextVolume(
|
targetVolume = volumes.getNextVolume(
|
||||||
StorageType.DEFAULT, replicaInfo.getNumBytes());
|
StorageType.DEFAULT, replicaInfo.getNumBytes());
|
||||||
}
|
|
||||||
|
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("LazyWriter starting to save blockId=" + blockId + "; bpid=" + bpid);
|
LOG.debug("LazyWriter starting to save blockId=" + blockId + "; bpid=" + bpid);
|
||||||
|
@ -2419,6 +2341,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
||||||
" to file " + savedBlockFile);
|
" to file " + savedBlockFile);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Checkpoint a pending replica to persistent storage now.
|
* Checkpoint a pending replica to persistent storage now.
|
||||||
|
@ -2430,28 +2353,100 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
||||||
boolean succeeded = false;
|
boolean succeeded = false;
|
||||||
|
|
||||||
try {
|
try {
|
||||||
synchronized (this) {
|
|
||||||
replicaState = lazyWriteReplicaTracker.dequeueNextReplicaToPersist();
|
replicaState = lazyWriteReplicaTracker.dequeueNextReplicaToPersist();
|
||||||
if (replicaState == null) {
|
if (replicaState != null) {
|
||||||
return false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Move the replica outside the lock.
|
// Move the replica outside the lock.
|
||||||
moveReplicaToNewVolume(replicaState.bpid, replicaState.blockId);
|
moveReplicaToNewVolume(replicaState.bpid, replicaState.blockId);
|
||||||
|
}
|
||||||
succeeded = true;
|
succeeded = true;
|
||||||
} catch(IOException ioe) {
|
} catch(IOException ioe) {
|
||||||
LOG.warn("Exception saving replica " + replicaState, ioe);
|
LOG.warn("Exception saving replica " + replicaState, ioe);
|
||||||
} finally {
|
} finally {
|
||||||
if (!succeeded && replicaState != null) {
|
if (!succeeded && replicaState != null) {
|
||||||
LOG.warn("Failed to save replica " + replicaState + ". re-enqueueing it.");
|
LOG.warn("Failed to save replica " + replicaState + ". re-enqueueing it.");
|
||||||
lazyWriteReplicaTracker.reenqueueReplica(replicaState);
|
lazyWriteReplicaTracker.reenqueueReplicaNotPersisted(replicaState);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return succeeded;
|
return succeeded;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private boolean transientFreeSpaceBelowThreshold() throws IOException {
|
||||||
|
long free = 0;
|
||||||
|
long capacity = 0;
|
||||||
|
|
||||||
|
// Don't worry about fragmentation for now. We don't expect more than one
|
||||||
|
// transient volume per DN.
|
||||||
|
for (FsVolumeImpl v : volumes.volumes) {
|
||||||
|
if (v.isTransientStorage()) {
|
||||||
|
capacity += v.getCapacity();
|
||||||
|
free += v.getAvailable();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (capacity == 0) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
int percentFree = (int) (free * 100 / capacity);
|
||||||
|
return percentFree < LOW_WATERMARK_FREE_SPACE_PERCENT ||
|
||||||
|
free < (estimateBlockSize * LOW_WATERMARK_FREE_SPACE_REPLICAS);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Attempt to evict one or more transient block replicas we have at least
|
||||||
|
* spaceNeeded bytes free.
|
||||||
|
*/
|
||||||
|
private synchronized void evictBlocks() throws IOException {
|
||||||
|
int iterations = 0;
|
||||||
|
|
||||||
|
LazyWriteReplicaTracker.ReplicaState replicaState =
|
||||||
|
lazyWriteReplicaTracker.getNextCandidateForEviction();
|
||||||
|
|
||||||
|
while (replicaState != null &&
|
||||||
|
iterations++ < MAX_BLOCK_EVICTIONS_PER_ITERATION &
|
||||||
|
transientFreeSpaceBelowThreshold()) {
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.info("Evicting block " + replicaState);
|
||||||
|
}
|
||||||
|
ReplicaInfo replicaInfo = getReplicaInfo(replicaState.bpid, replicaState.blockId);
|
||||||
|
Preconditions.checkState(replicaInfo.getVolume().isTransientStorage());
|
||||||
|
File blockFile = replicaInfo.getBlockFile();
|
||||||
|
File metaFile = replicaInfo.getMetaFile();
|
||||||
|
long blockFileUsed = blockFile.length();
|
||||||
|
long metaFileUsed = metaFile.length();
|
||||||
|
lazyWriteReplicaTracker.discardReplica(replicaState, false);
|
||||||
|
|
||||||
|
// Move the replica from lazyPersist/ to finalized/ on target volume
|
||||||
|
BlockPoolSlice bpSlice =
|
||||||
|
replicaState.lazyPersistVolume.getBlockPoolSlice(replicaState.bpid);
|
||||||
|
File newBlockFile = bpSlice.activateSavedReplica(
|
||||||
|
replicaInfo, replicaState.savedBlockFile);
|
||||||
|
|
||||||
|
ReplicaInfo newReplicaInfo =
|
||||||
|
new FinalizedReplica(replicaInfo.getBlockId(),
|
||||||
|
replicaInfo.getBytesOnDisk(),
|
||||||
|
replicaInfo.getGenerationStamp(),
|
||||||
|
replicaState.lazyPersistVolume,
|
||||||
|
newBlockFile.getParentFile());
|
||||||
|
|
||||||
|
// Update the volumeMap entry. This removes the old entry.
|
||||||
|
volumeMap.add(replicaState.bpid, newReplicaInfo);
|
||||||
|
|
||||||
|
// Remove the old replicas from transient storage.
|
||||||
|
if (blockFile.delete() || !blockFile.exists()) {
|
||||||
|
((FsVolumeImpl) replicaInfo.getVolume()).decDfsUsed(replicaState.bpid, blockFileUsed);
|
||||||
|
}
|
||||||
|
if (metaFile.delete() || !metaFile.exists()) {
|
||||||
|
((FsVolumeImpl) replicaInfo.getVolume()).decDfsUsed(replicaState.bpid, metaFileUsed);
|
||||||
|
}
|
||||||
|
|
||||||
|
// If deletion failed then the directory scanner will cleanup the blocks
|
||||||
|
// eventually.
|
||||||
|
replicaState = lazyWriteReplicaTracker.getNextCandidateForEviction();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
int numSuccessiveFailures = 0;
|
int numSuccessiveFailures = 0;
|
||||||
|
@ -2459,11 +2454,12 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
||||||
while (fsRunning && shouldRun) {
|
while (fsRunning && shouldRun) {
|
||||||
try {
|
try {
|
||||||
numSuccessiveFailures = saveNextReplica() ? 0 : (numSuccessiveFailures + 1);
|
numSuccessiveFailures = saveNextReplica() ? 0 : (numSuccessiveFailures + 1);
|
||||||
|
evictBlocks();
|
||||||
|
|
||||||
// Sleep if we have no more work to do or if it looks like we are not
|
// Sleep if we have no more work to do or if it looks like we are not
|
||||||
// making any forward progress. This is to ensure that if all persist
|
// making any forward progress. This is to ensure that if all persist
|
||||||
// operations are failing we don't keep retrying them in a tight loop.
|
// operations are failing we don't keep retrying them in a tight loop.
|
||||||
if (numSuccessiveFailures == lazyWriteReplicaTracker.numReplicasNotPersisted()) {
|
if (numSuccessiveFailures >= lazyWriteReplicaTracker.numReplicasNotPersisted()) {
|
||||||
Thread.sleep(checkpointerInterval * 1000);
|
Thread.sleep(checkpointerInterval * 1000);
|
||||||
numSuccessiveFailures = 0;
|
numSuccessiveFailures = 0;
|
||||||
}
|
}
|
||||||
|
@ -2471,7 +2467,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
||||||
LOG.info("LazyWriter was interrupted, exiting");
|
LOG.info("LazyWriter was interrupted, exiting");
|
||||||
break;
|
break;
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
LOG.error("Ignoring exception in LazyWriter:", e);
|
LOG.warn("Ignoring exception in LazyWriter:", e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -104,30 +104,23 @@ class LazyWriteReplicaTracker {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Queue of replicas that need to be written to disk.
|
* Queue of replicas that need to be written to disk.
|
||||||
|
* Stale entries are GC'd by dequeueNextReplicaToPersist.
|
||||||
*/
|
*/
|
||||||
final Queue<ReplicaState> replicasNotPersisted;
|
final Queue<ReplicaState> replicasNotPersisted;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A map of blockId to persist complete time for transient blocks. This allows
|
* Queue of replicas in the order in which they were persisted.
|
||||||
* us to evict LRU blocks from transient storage. Protected by 'this'
|
* We'll dequeue them in the same order.
|
||||||
* Object lock.
|
* We can improve the eviction scheme later.
|
||||||
|
* Stale entries are GC'd by getNextCandidateForEviction.
|
||||||
*/
|
*/
|
||||||
final Map<ReplicaState, Long> replicasPersisted;
|
final Queue<ReplicaState> replicasPersisted;
|
||||||
|
|
||||||
LazyWriteReplicaTracker(final FsDatasetImpl fsDataset) {
|
LazyWriteReplicaTracker(final FsDatasetImpl fsDataset) {
|
||||||
this.fsDataset = fsDataset;
|
this.fsDataset = fsDataset;
|
||||||
replicaMaps = new HashMap<String, Map<Long, ReplicaState>>();
|
replicaMaps = new HashMap<String, Map<Long, ReplicaState>>();
|
||||||
replicasNotPersisted = new LinkedList<ReplicaState>();
|
replicasNotPersisted = new LinkedList<ReplicaState>();
|
||||||
replicasPersisted = new HashMap<ReplicaState, Long>();
|
replicasPersisted = new LinkedList<ReplicaState>();
|
||||||
}
|
|
||||||
|
|
||||||
TreeMultimap<Long, ReplicaState> getLruMap() {
|
|
||||||
// TODO: This can be made more efficient.
|
|
||||||
TreeMultimap<Long, ReplicaState> reversedMap = TreeMultimap.create();
|
|
||||||
for (Map.Entry<ReplicaState, Long> entry : replicasPersisted.entrySet()) {
|
|
||||||
reversedMap.put(entry.getValue(), entry.getKey());
|
|
||||||
}
|
|
||||||
return reversedMap;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
synchronized void addReplica(String bpid, long blockId,
|
synchronized void addReplica(String bpid, long blockId,
|
||||||
|
@ -171,7 +164,8 @@ class LazyWriteReplicaTracker {
|
||||||
// one.
|
// one.
|
||||||
replicasNotPersisted.remove(replicaState);
|
replicasNotPersisted.remove(replicaState);
|
||||||
}
|
}
|
||||||
replicasPersisted.put(replicaState, System.currentTimeMillis() / 1000);
|
|
||||||
|
replicasPersisted.add(replicaState);
|
||||||
}
|
}
|
||||||
|
|
||||||
synchronized ReplicaState dequeueNextReplicaToPersist() {
|
synchronized ReplicaState dequeueNextReplicaToPersist() {
|
||||||
|
@ -188,14 +182,36 @@ class LazyWriteReplicaTracker {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
synchronized void reenqueueReplica(final ReplicaState replicaState) {
|
synchronized void reenqueueReplicaNotPersisted(final ReplicaState replicaState) {
|
||||||
replicasNotPersisted.add(replicaState);
|
replicasNotPersisted.add(replicaState);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
synchronized void reenqueueReplicaPersisted(final ReplicaState replicaState) {
|
||||||
|
replicasPersisted.add(replicaState);
|
||||||
|
}
|
||||||
|
|
||||||
synchronized int numReplicasNotPersisted() {
|
synchronized int numReplicasNotPersisted() {
|
||||||
return replicasNotPersisted.size();
|
return replicasNotPersisted.size();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
synchronized ReplicaState getNextCandidateForEviction() {
|
||||||
|
while (replicasPersisted.size() != 0) {
|
||||||
|
ReplicaState replicaState = replicasPersisted.remove();
|
||||||
|
Map<Long, ReplicaState> replicaMap = replicaMaps.get(replicaState.bpid);
|
||||||
|
|
||||||
|
if (replicaMap != null && replicaMap.get(replicaState.blockId) != null) {
|
||||||
|
return replicaState;
|
||||||
|
}
|
||||||
|
|
||||||
|
// The replica no longer exists, look for the next one.
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
void discardReplica(ReplicaState replicaState, boolean force) {
|
||||||
|
discardReplica(replicaState.bpid, replicaState.blockId, force);
|
||||||
|
}
|
||||||
|
|
||||||
synchronized void discardReplica(
|
synchronized void discardReplica(
|
||||||
final String bpid, final long blockId, boolean force) {
|
final String bpid, final long blockId, boolean force) {
|
||||||
Map<Long, ReplicaState> map = replicaMaps.get(bpid);
|
Map<Long, ReplicaState> map = replicaMaps.get(bpid);
|
||||||
|
@ -221,9 +237,5 @@ class LazyWriteReplicaTracker {
|
||||||
}
|
}
|
||||||
|
|
||||||
map.remove(blockId);
|
map.remove(blockId);
|
||||||
replicasPersisted.remove(replicaState);
|
|
||||||
|
|
||||||
// Leave the replica in replicasNotPersisted if its present.
|
|
||||||
// dequeueNextReplicaToPersist will GC it eventually.
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue