HDFS-10690. Optimize insertion/removal of replica in ShortCircuitCache. Contributed by Fenghua Hu.

This commit is contained in:
Xiaoyu Yao 2016-10-03 10:53:21 -07:00
parent de7a0a92ca
commit 607705c488
3 changed files with 69 additions and 45 deletions

View File

@ -26,13 +26,14 @@ import java.nio.MappedByteBuffer;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.TreeMap; import java.util.NoSuchElementException;
import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.collections.map.LinkedMap;
import org.apache.commons.lang.mutable.MutableBoolean; import org.apache.commons.lang.mutable.MutableBoolean;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.ExtendedBlockId; import org.apache.hadoop.hdfs.ExtendedBlockId;
@ -107,16 +108,20 @@ public class ShortCircuitCache implements Closeable {
int numDemoted = demoteOldEvictableMmaped(curMs); int numDemoted = demoteOldEvictableMmaped(curMs);
int numPurged = 0; int numPurged = 0;
Long evictionTimeNs = (long) 0; Long evictionTimeNs;
while (true) { while (true) {
Entry<Long, ShortCircuitReplica> entry = Object eldestKey;
evictable.ceilingEntry(evictionTimeNs); try {
if (entry == null) break; eldestKey = evictable.firstKey();
evictionTimeNs = entry.getKey(); } catch (NoSuchElementException e) {
break;
}
evictionTimeNs = (Long)eldestKey;
long evictionTimeMs = long evictionTimeMs =
TimeUnit.MILLISECONDS.convert(evictionTimeNs, TimeUnit.NANOSECONDS); TimeUnit.MILLISECONDS.convert(evictionTimeNs, TimeUnit.NANOSECONDS);
if (evictionTimeMs + maxNonMmappedEvictableLifespanMs >= curMs) break; if (evictionTimeMs + maxNonMmappedEvictableLifespanMs >= curMs) break;
ShortCircuitReplica replica = entry.getValue(); ShortCircuitReplica replica = (ShortCircuitReplica)evictable.get(
eldestKey);
if (LOG.isTraceEnabled()) { if (LOG.isTraceEnabled()) {
LOG.trace("CacheCleaner: purging " + replica + ": " + LOG.trace("CacheCleaner: purging " + replica + ": " +
StringUtils.getStackTrace(Thread.currentThread())); StringUtils.getStackTrace(Thread.currentThread()));
@ -263,11 +268,11 @@ public class ShortCircuitCache implements Closeable {
private CacheCleaner cacheCleaner; private CacheCleaner cacheCleaner;
/** /**
* Tree of evictable elements. * LinkedMap of evictable elements.
* *
* Maps (unique) insertion time in nanoseconds to the element. * Maps (unique) insertion time in nanoseconds to the element.
*/ */
private final TreeMap<Long, ShortCircuitReplica> evictable = new TreeMap<>(); private final LinkedMap evictable = new LinkedMap();
/** /**
* Maximum total size of the cache, including both mmapped and * Maximum total size of the cache, including both mmapped and
@ -281,12 +286,11 @@ public class ShortCircuitCache implements Closeable {
private long maxNonMmappedEvictableLifespanMs; private long maxNonMmappedEvictableLifespanMs;
/** /**
* Tree of mmaped evictable elements. * LinkedMap of mmaped evictable elements.
* *
* Maps (unique) insertion time in nanoseconds to the element. * Maps (unique) insertion time in nanoseconds to the element.
*/ */
private final TreeMap<Long, ShortCircuitReplica> evictableMmapped = private final LinkedMap evictableMmapped = new LinkedMap();
new TreeMap<>();
/** /**
* Maximum number of mmaped evictable elements. * Maximum number of mmaped evictable elements.
@ -482,13 +486,16 @@ public class ShortCircuitCache implements Closeable {
private int demoteOldEvictableMmaped(long now) { private int demoteOldEvictableMmaped(long now) {
int numDemoted = 0; int numDemoted = 0;
boolean needMoreSpace = false; boolean needMoreSpace = false;
Long evictionTimeNs = (long) 0; Long evictionTimeNs;
while (true) { while (true) {
Entry<Long, ShortCircuitReplica> entry = Object eldestKey;
evictableMmapped.ceilingEntry(evictionTimeNs); try {
if (entry == null) break; eldestKey = evictableMmapped.firstKey();
evictionTimeNs = entry.getKey(); } catch (NoSuchElementException e) {
break;
}
evictionTimeNs = (Long)eldestKey;
long evictionTimeMs = long evictionTimeMs =
TimeUnit.MILLISECONDS.convert(evictionTimeNs, TimeUnit.NANOSECONDS); TimeUnit.MILLISECONDS.convert(evictionTimeNs, TimeUnit.NANOSECONDS);
if (evictionTimeMs + maxEvictableMmapedLifespanMs >= now) { if (evictionTimeMs + maxEvictableMmapedLifespanMs >= now) {
@ -497,7 +504,8 @@ public class ShortCircuitCache implements Closeable {
} }
needMoreSpace = true; needMoreSpace = true;
} }
ShortCircuitReplica replica = entry.getValue(); ShortCircuitReplica replica = (ShortCircuitReplica)evictableMmapped.get(
eldestKey);
if (LOG.isTraceEnabled()) { if (LOG.isTraceEnabled()) {
String rationale = needMoreSpace ? "because we need more space" : String rationale = needMoreSpace ? "because we need more space" :
"because it's too old"; "because it's too old";
@ -527,10 +535,15 @@ public class ShortCircuitCache implements Closeable {
return; return;
} }
ShortCircuitReplica replica; ShortCircuitReplica replica;
if (evictableSize == 0) { try {
replica = evictableMmapped.firstEntry().getValue(); if (evictableSize == 0) {
} else { replica = (ShortCircuitReplica)evictableMmapped.get(evictableMmapped
replica = evictable.firstEntry().getValue(); .firstKey());
} else {
replica = (ShortCircuitReplica)evictable.get(evictable.firstKey());
}
} catch (NoSuchElementException e) {
break;
} }
if (LOG.isTraceEnabled()) { if (LOG.isTraceEnabled()) {
LOG.trace(this + ": trimEvictionMaps is purging " + replica + LOG.trace(this + ": trimEvictionMaps is purging " + replica +
@ -573,10 +586,11 @@ public class ShortCircuitCache implements Closeable {
* @param map The map to remove it from. * @param map The map to remove it from.
*/ */
private void removeEvictable(ShortCircuitReplica replica, private void removeEvictable(ShortCircuitReplica replica,
TreeMap<Long, ShortCircuitReplica> map) { LinkedMap map) {
Long evictableTimeNs = replica.getEvictableTimeNs(); Long evictableTimeNs = replica.getEvictableTimeNs();
Preconditions.checkNotNull(evictableTimeNs); Preconditions.checkNotNull(evictableTimeNs);
ShortCircuitReplica removed = map.remove(evictableTimeNs); ShortCircuitReplica removed = (ShortCircuitReplica)map.remove(
evictableTimeNs);
Preconditions.checkState(removed == replica, Preconditions.checkState(removed == replica,
"failed to make %s unevictable", replica); "failed to make %s unevictable", replica);
replica.setEvictableTimeNs(null); replica.setEvictableTimeNs(null);
@ -593,7 +607,7 @@ public class ShortCircuitCache implements Closeable {
* @param map The map to insert it into. * @param map The map to insert it into.
*/ */
private void insertEvictable(Long evictionTimeNs, private void insertEvictable(Long evictionTimeNs,
ShortCircuitReplica replica, TreeMap<Long, ShortCircuitReplica> map) { ShortCircuitReplica replica, LinkedMap map) {
while (map.containsKey(evictionTimeNs)) { while (map.containsKey(evictionTimeNs)) {
evictionTimeNs++; evictionTimeNs++;
} }
@ -861,14 +875,22 @@ public class ShortCircuitCache implements Closeable {
IOUtilsClient.cleanup(LOG, cacheCleaner); IOUtilsClient.cleanup(LOG, cacheCleaner);
// Purge all replicas. // Purge all replicas.
while (true) { while (true) {
Entry<Long, ShortCircuitReplica> entry = evictable.firstEntry(); Object eldestKey;
if (entry == null) break; try {
purge(entry.getValue()); eldestKey = evictable.firstKey();
} catch (NoSuchElementException e) {
break;
}
purge((ShortCircuitReplica)evictable.get(eldestKey));
} }
while (true) { while (true) {
Entry<Long, ShortCircuitReplica> entry = evictableMmapped.firstEntry(); Object eldestKey;
if (entry == null) break; try {
purge(entry.getValue()); eldestKey = evictableMmapped.firstKey();
} catch (NoSuchElementException e) {
break;
}
purge((ShortCircuitReplica)evictableMmapped.get(eldestKey));
} }
} finally { } finally {
lock.unlock(); lock.unlock();
@ -909,8 +931,8 @@ public class ShortCircuitCache implements Closeable {
void visit(int numOutstandingMmaps, void visit(int numOutstandingMmaps,
Map<ExtendedBlockId, ShortCircuitReplica> replicas, Map<ExtendedBlockId, ShortCircuitReplica> replicas,
Map<ExtendedBlockId, InvalidToken> failedLoads, Map<ExtendedBlockId, InvalidToken> failedLoads,
Map<Long, ShortCircuitReplica> evictable, LinkedMap evictable,
Map<Long, ShortCircuitReplica> evictableMmapped); LinkedMap evictableMmapped);
} }
@VisibleForTesting // ONLY for testing @VisibleForTesting // ONLY for testing

View File

@ -34,6 +34,7 @@ import java.util.Map;
import java.util.Random; import java.util.Random;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import org.apache.commons.collections.map.LinkedMap;
import org.apache.commons.lang.SystemUtils; import org.apache.commons.lang.SystemUtils;
import org.apache.commons.lang.mutable.MutableBoolean; import org.apache.commons.lang.mutable.MutableBoolean;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
@ -307,8 +308,8 @@ public class TestEnhancedByteBufferAccess {
public void visit(int numOutstandingMmaps, public void visit(int numOutstandingMmaps,
Map<ExtendedBlockId, ShortCircuitReplica> replicas, Map<ExtendedBlockId, ShortCircuitReplica> replicas,
Map<ExtendedBlockId, InvalidToken> failedLoads, Map<ExtendedBlockId, InvalidToken> failedLoads,
Map<Long, ShortCircuitReplica> evictable, LinkedMap evictable,
Map<Long, ShortCircuitReplica> evictableMmapped) { LinkedMap evictableMmapped) {
if (expectedNumOutstandingMmaps >= 0) { if (expectedNumOutstandingMmaps >= 0) {
Assert.assertEquals(expectedNumOutstandingMmaps, numOutstandingMmaps); Assert.assertEquals(expectedNumOutstandingMmaps, numOutstandingMmaps);
} }
@ -373,8 +374,8 @@ public class TestEnhancedByteBufferAccess {
public void visit(int numOutstandingMmaps, public void visit(int numOutstandingMmaps,
Map<ExtendedBlockId, ShortCircuitReplica> replicas, Map<ExtendedBlockId, ShortCircuitReplica> replicas,
Map<ExtendedBlockId, InvalidToken> failedLoads, Map<ExtendedBlockId, InvalidToken> failedLoads,
Map<Long, ShortCircuitReplica> evictable, LinkedMap evictable,
Map<Long, ShortCircuitReplica> evictableMmapped) { LinkedMap evictableMmapped) {
ShortCircuitReplica replica = replicas.get( ShortCircuitReplica replica = replicas.get(
new ExtendedBlockId(firstBlock.getBlockId(), firstBlock.getBlockPoolId())); new ExtendedBlockId(firstBlock.getBlockId(), firstBlock.getBlockPoolId()));
Assert.assertNotNull(replica); Assert.assertNotNull(replica);
@ -410,8 +411,8 @@ public class TestEnhancedByteBufferAccess {
public void visit(int numOutstandingMmaps, public void visit(int numOutstandingMmaps,
Map<ExtendedBlockId, ShortCircuitReplica> replicas, Map<ExtendedBlockId, ShortCircuitReplica> replicas,
Map<ExtendedBlockId, InvalidToken> failedLoads, Map<ExtendedBlockId, InvalidToken> failedLoads,
Map<Long, ShortCircuitReplica> evictable, LinkedMap evictable,
Map<Long, ShortCircuitReplica> evictableMmapped) { LinkedMap evictableMmapped) {
finished.setValue(evictableMmapped.isEmpty()); finished.setValue(evictableMmapped.isEmpty());
} }
}); });
@ -685,8 +686,8 @@ public class TestEnhancedByteBufferAccess {
public void visit(int numOutstandingMmaps, public void visit(int numOutstandingMmaps,
Map<ExtendedBlockId, ShortCircuitReplica> replicas, Map<ExtendedBlockId, ShortCircuitReplica> replicas,
Map<ExtendedBlockId, InvalidToken> failedLoads, Map<ExtendedBlockId, InvalidToken> failedLoads,
Map<Long, ShortCircuitReplica> evictable, LinkedMap evictable,
Map<Long, ShortCircuitReplica> evictableMmapped) { LinkedMap evictableMmapped) {
Assert.assertEquals(expectedOutstandingMmaps, numOutstandingMmaps); Assert.assertEquals(expectedOutstandingMmaps, numOutstandingMmaps);
ShortCircuitReplica replica = ShortCircuitReplica replica =
replicas.get(ExtendedBlockId.fromExtendedBlock(block)); replicas.get(ExtendedBlockId.fromExtendedBlock(block));

View File

@ -34,6 +34,7 @@ import java.util.Iterator;
import java.util.Map; import java.util.Map;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import org.apache.commons.collections.map.LinkedMap;
import org.apache.commons.lang.mutable.MutableBoolean; import org.apache.commons.lang.mutable.MutableBoolean;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -502,8 +503,8 @@ public class TestShortCircuitCache {
public void visit(int numOutstandingMmaps, public void visit(int numOutstandingMmaps,
Map<ExtendedBlockId, ShortCircuitReplica> replicas, Map<ExtendedBlockId, ShortCircuitReplica> replicas,
Map<ExtendedBlockId, InvalidToken> failedLoads, Map<ExtendedBlockId, InvalidToken> failedLoads,
Map<Long, ShortCircuitReplica> evictable, LinkedMap evictable,
Map<Long, ShortCircuitReplica> evictableMmapped) { LinkedMap evictableMmapped) {
ShortCircuitReplica replica = replicas.get( ShortCircuitReplica replica = replicas.get(
ExtendedBlockId.fromExtendedBlock(block)); ExtendedBlockId.fromExtendedBlock(block));
Assert.assertNotNull(replica); Assert.assertNotNull(replica);
@ -518,8 +519,8 @@ public class TestShortCircuitCache {
public void visit(int numOutstandingMmaps, public void visit(int numOutstandingMmaps,
Map<ExtendedBlockId, ShortCircuitReplica> replicas, Map<ExtendedBlockId, ShortCircuitReplica> replicas,
Map<ExtendedBlockId, InvalidToken> failedLoads, Map<ExtendedBlockId, InvalidToken> failedLoads,
Map<Long, ShortCircuitReplica> evictable, LinkedMap evictable,
Map<Long, ShortCircuitReplica> evictableMmapped) { LinkedMap evictableMmapped) {
ShortCircuitReplica replica = replicas.get( ShortCircuitReplica replica = replicas.get(
ExtendedBlockId.fromExtendedBlock(block)); ExtendedBlockId.fromExtendedBlock(block));
Assert.assertNotNull(replica); Assert.assertNotNull(replica);