SOLR-9284: The HDFS BlockDirectoryCache should not let it's keysToRelease or names maps grow indefinitely.

This commit is contained in:
markrmiller 2016-11-15 03:32:53 -05:00
parent 0d290ae136
commit 0325722e67
4 changed files with 51 additions and 11 deletions

View File

@ -156,7 +156,9 @@ Bug Fixes
of the first slice. This puts undue pressure on leader cores and likely on the wrong ones. This is
fixed to randomly pick a leader on updates or a replica core otherwise. (Cao Manh Dat via shalin)
* SOLR-9284: The HDFS BlockDirectoryCache should not let it's keysToRelease or names maps grow indefinitely.
(Mark Miller, Michael Sun)
Other Changes
----------------------

View File

@ -38,6 +38,11 @@ public class BlockCache {
private final int numberOfBlocksPerBank;
private final int maxEntries;
private final Metrics metrics;
private volatile OnRelease onRelease;
public static interface OnRelease {
public void release(BlockCacheKey blockCacheKey);
}
public BlockCache(Metrics metrics, boolean directAllocation, long totalMemory) {
this(metrics, directAllocation, totalMemory, _128M);
@ -69,7 +74,7 @@ public class BlockCache {
}
RemovalListener<BlockCacheKey,BlockCacheLocation> listener =
notification -> releaseLocation(notification.getValue());
notification -> releaseLocation(notification.getKey(), notification.getValue());
cache = Caffeine.newBuilder()
.removalListener(listener)
.maximumSize(maxEntries)
@ -81,7 +86,7 @@ public class BlockCache {
cache.invalidate(key);
}
private void releaseLocation(BlockCacheLocation location) {
private void releaseLocation(BlockCacheKey blockCacheKey, BlockCacheLocation location) {
if (location == null) {
return;
}
@ -90,6 +95,9 @@ public class BlockCache {
location.setRemoved(true);
locks[bankId].clear(block);
lockCounters[bankId].decrementAndGet();
if (onRelease != null) {
onRelease.release(blockCacheKey);
}
metrics.blockCacheEviction.incrementAndGet();
metrics.blockCacheSize.decrementAndGet();
}
@ -200,4 +208,8 @@ public class BlockCache {
public int getSize() {
return cache.asMap().size();
}
void setOnRelease(OnRelease onRelease) {
this.onRelease = onRelease;
}
}

View File

@ -17,18 +17,22 @@
package org.apache.solr.store.blockcache;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.solr.store.blockcache.BlockCache.OnRelease;
import com.github.benmanes.caffeine.cache.Caffeine;
/**
* @lucene.experimental
*/
public class BlockDirectoryCache implements Cache {
private final BlockCache blockCache;
private final AtomicInteger counter = new AtomicInteger();
private final Map<String,Integer> names = new ConcurrentHashMap<>(8192, 0.75f, 512);
private final com.github.benmanes.caffeine.cache.Cache<String,Integer> names;
private Set<BlockCacheKey> keysToRelease;
private final String path;
private final Metrics metrics;
@ -41,11 +45,21 @@ public class BlockDirectoryCache implements Cache {
this.blockCache = blockCache;
this.path = path;
this.metrics = metrics;
names = Caffeine.newBuilder().maximumSize(50000).build();
if (releaseBlocks) {
keysToRelease = Collections.newSetFromMap(new ConcurrentHashMap<BlockCacheKey,Boolean>(1024, 0.75f, 512));
blockCache.setOnRelease(new OnRelease() {
@Override
public void release(BlockCacheKey key) {
keysToRelease.remove(key);
}
});
}
}
/**
* Expert: mostly for tests
*
@ -57,13 +71,13 @@ public class BlockDirectoryCache implements Cache {
@Override
public void delete(String name) {
names.remove(name);
names.invalidate(name);
}
@Override
public void update(String name, long blockId, int blockOffset, byte[] buffer,
int offset, int length) {
Integer file = names.get(name);
Integer file = names.getIfPresent(name);
if (file == null) {
file = counter.incrementAndGet();
names.put(name, file);
@ -80,7 +94,7 @@ public class BlockDirectoryCache implements Cache {
@Override
public boolean fetch(String name, long blockId, int blockOffset, byte[] b,
int off, int lengthToReadInBlock) {
Integer file = names.get(name);
Integer file = names.getIfPresent(name);
if (file == null) {
return false;
}
@ -105,7 +119,8 @@ public class BlockDirectoryCache implements Cache {
@Override
public void renameCacheFile(String source, String dest) {
Integer file = names.remove(source);
Integer file = names.getIfPresent(source);
names.invalidate(source);
// possible if the file is empty
if (file != null) {
names.put(dest, file);

View File

@ -110,7 +110,18 @@ public class BlockDirectoryTest extends SolrTestCaseJ4 {
file = createTempDir().toFile();
FSDirectory dir = FSDirectory.open(new File(file, "base").toPath());
mapperCache = new MapperCache();
directory = new BlockDirectory("test", dir, mapperCache, null, true, true);
if (random().nextBoolean()) {
Metrics metrics = new Metrics();
int blockSize = 8192;
int slabSize = blockSize * 32768;
long totalMemory = 2 * slabSize;
BlockCache blockCache = new BlockCache(metrics, true, totalMemory, slabSize, blockSize);
BlockDirectoryCache cache = new BlockDirectoryCache(blockCache, "/collection1", metrics, true);
directory = new BlockDirectory("test", dir, cache, null, true, false);
} else {
directory = new BlockDirectory("test", dir, mapperCache, null, true, true);
}
random = random();
}