mirror of https://github.com/apache/druid.git
simplify MapCache
This commit is contained in:
parent
e0c34c3b97
commit
40c0bcad29
|
@ -21,7 +21,6 @@ package com.metamx.druid.client.cache;
|
||||||
|
|
||||||
import com.google.common.collect.Maps;
|
import com.google.common.collect.Maps;
|
||||||
import com.google.common.primitives.Ints;
|
import com.google.common.primitives.Ints;
|
||||||
import com.metamx.common.ISE;
|
|
||||||
|
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
|
@ -34,21 +33,10 @@ import java.util.concurrent.atomic.AtomicLong;
|
||||||
*/
|
*/
|
||||||
public class MapCache implements Cache
|
public class MapCache implements Cache
|
||||||
{
|
{
|
||||||
/**
|
|
||||||
* An interface to limit the operations that can be done on a Cache so that it is easier to reason about what
|
|
||||||
* is actually going to be done.
|
|
||||||
*/
|
|
||||||
public interface Cache
|
|
||||||
{
|
|
||||||
public byte[] get(byte[] key);
|
|
||||||
public void put(byte[] key, byte[] value);
|
|
||||||
public void close();
|
|
||||||
}
|
|
||||||
|
|
||||||
private final Map<ByteBuffer, byte[]> baseMap;
|
private final Map<ByteBuffer, byte[]> baseMap;
|
||||||
private final ByteCountingLRUMap byteCountingLRUMap;
|
private final ByteCountingLRUMap byteCountingLRUMap;
|
||||||
|
|
||||||
private final Map<String, Cache> cacheCache;
|
private final Map<String, byte[]> namespaceId;
|
||||||
private final AtomicInteger ids;
|
private final AtomicInteger ids;
|
||||||
|
|
||||||
private final Object clearLock = new Object();
|
private final Object clearLock = new Object();
|
||||||
|
@ -75,7 +63,7 @@ public class MapCache implements Cache
|
||||||
|
|
||||||
this.baseMap = Collections.synchronizedMap(byteCountingLRUMap);
|
this.baseMap = Collections.synchronizedMap(byteCountingLRUMap);
|
||||||
|
|
||||||
cacheCache = Maps.newHashMap();
|
namespaceId = Maps.newHashMap();
|
||||||
ids = new AtomicInteger();
|
ids = new AtomicInteger();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -95,51 +83,7 @@ public class MapCache implements Cache
|
||||||
@Override
|
@Override
|
||||||
public byte[] get(NamedKey key)
|
public byte[] get(NamedKey key)
|
||||||
{
|
{
|
||||||
return provideCache(key.namespace).get(key.key);
|
final byte[] retVal = baseMap.get(computeKey(getNamespaceId(key.namespace), key.key));
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void put(NamedKey key, byte[] value)
|
|
||||||
{
|
|
||||||
provideCache(key.namespace).put(key.key, value);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public Map<NamedKey, byte[]> getBulk(Iterable<NamedKey> keys)
|
|
||||||
{
|
|
||||||
Map<NamedKey, byte[]> retVal = Maps.newHashMap();
|
|
||||||
|
|
||||||
for(NamedKey key : keys) {
|
|
||||||
retVal.put(key, provideCache(key.namespace).get(key.key));
|
|
||||||
}
|
|
||||||
return retVal;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void close(String namespace)
|
|
||||||
{
|
|
||||||
provideCache(namespace).close();
|
|
||||||
}
|
|
||||||
|
|
||||||
private Cache provideCache(final String identifier)
|
|
||||||
{
|
|
||||||
synchronized (cacheCache) {
|
|
||||||
final Cache cachedCache = cacheCache.get(identifier);
|
|
||||||
if (cachedCache != null) {
|
|
||||||
return cachedCache;
|
|
||||||
}
|
|
||||||
|
|
||||||
final byte[] myIdBytes = Ints.toByteArray(ids.getAndIncrement());
|
|
||||||
|
|
||||||
final Cache theCache = new Cache()
|
|
||||||
{
|
|
||||||
volatile boolean open = true;
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public byte[] get(byte[] key)
|
|
||||||
{
|
|
||||||
if (open) {
|
|
||||||
final byte[] retVal = baseMap.get(computeKey(key));
|
|
||||||
if (retVal == null) {
|
if (retVal == null) {
|
||||||
missCount.incrementAndGet();
|
missCount.incrementAndGet();
|
||||||
} else {
|
} else {
|
||||||
|
@ -147,57 +91,68 @@ public class MapCache implements Cache
|
||||||
}
|
}
|
||||||
return retVal;
|
return retVal;
|
||||||
}
|
}
|
||||||
throw new ISE("Cache for namespace[%s] is closed.", identifier);
|
|
||||||
|
@Override
|
||||||
|
public void put(NamedKey key, byte[] value)
|
||||||
|
{
|
||||||
|
synchronized (clearLock) {
|
||||||
|
baseMap.put(computeKey(getNamespaceId(key.namespace), key.key), value);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void put(byte[] key, byte[] value)
|
public Map<NamedKey, byte[]> getBulk(Iterable<NamedKey> keys)
|
||||||
{
|
{
|
||||||
synchronized (clearLock) {
|
Map<NamedKey, byte[]> retVal = Maps.newHashMap();
|
||||||
if (open) {
|
for(NamedKey key : keys) {
|
||||||
baseMap.put(computeKey(key), value);
|
retVal.put(key, get(key));
|
||||||
return;
|
|
||||||
}
|
}
|
||||||
}
|
return retVal;
|
||||||
throw new ISE("Cache for namespace[%s] is closed.", identifier);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void close()
|
public void close(String namespace)
|
||||||
{
|
{
|
||||||
synchronized (cacheCache) {
|
byte[] idBytes;
|
||||||
cacheCache.remove(identifier);
|
synchronized (namespaceId) {
|
||||||
|
idBytes = getNamespaceId(namespace);
|
||||||
|
if(idBytes == null) return;
|
||||||
|
|
||||||
|
namespaceId.remove(namespace);
|
||||||
}
|
}
|
||||||
synchronized (clearLock) {
|
synchronized (clearLock) {
|
||||||
if (open) {
|
|
||||||
open = false;
|
|
||||||
|
|
||||||
Iterator<ByteBuffer> iter = baseMap.keySet().iterator();
|
Iterator<ByteBuffer> iter = baseMap.keySet().iterator();
|
||||||
while (iter.hasNext()) {
|
while (iter.hasNext()) {
|
||||||
ByteBuffer next = iter.next();
|
ByteBuffer next = iter.next();
|
||||||
|
|
||||||
if (next.get(0) == myIdBytes[0]
|
if (next.get(0) == idBytes[0]
|
||||||
&& next.get(1) == myIdBytes[1]
|
&& next.get(1) == idBytes[1]
|
||||||
&& next.get(2) == myIdBytes[2]
|
&& next.get(2) == idBytes[2]
|
||||||
&& next.get(3) == myIdBytes[3]) {
|
&& next.get(3) == idBytes[3]) {
|
||||||
iter.remove();
|
iter.remove();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private byte[] getNamespaceId(final String identifier)
|
||||||
|
{
|
||||||
|
synchronized (namespaceId) {
|
||||||
|
byte[] idBytes = namespaceId.get(identifier);
|
||||||
|
if (idBytes != null) {
|
||||||
|
return idBytes;
|
||||||
}
|
}
|
||||||
|
|
||||||
private ByteBuffer computeKey(byte[] key)
|
idBytes = Ints.toByteArray(ids.getAndIncrement());
|
||||||
|
namespaceId.put(identifier, idBytes);
|
||||||
|
return idBytes;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private ByteBuffer computeKey(byte[] idBytes, byte[] key)
|
||||||
{
|
{
|
||||||
final ByteBuffer retVal = ByteBuffer.allocate(key.length + 4).put(myIdBytes).put(key);
|
final ByteBuffer retVal = ByteBuffer.allocate(key.length + 4).put(idBytes).put(key);
|
||||||
retVal.rewind();
|
retVal.rewind();
|
||||||
return retVal;
|
return retVal;
|
||||||
}
|
}
|
||||||
};
|
|
||||||
|
|
||||||
cacheCache.put(identifier, theCache);
|
|
||||||
|
|
||||||
return theCache;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue