diff --git a/client/src/main/java/com/metamx/druid/client/cache/MapCache.java b/client/src/main/java/com/metamx/druid/client/cache/MapCache.java index 2780acd0f9e..53e1e20280a 100644 --- a/client/src/main/java/com/metamx/druid/client/cache/MapCache.java +++ b/client/src/main/java/com/metamx/druid/client/cache/MapCache.java @@ -21,7 +21,6 @@ package com.metamx.druid.client.cache; import com.google.common.collect.Maps; import com.google.common.primitives.Ints; -import com.metamx.common.ISE; import java.nio.ByteBuffer; import java.util.Collections; @@ -34,21 +33,10 @@ import java.util.concurrent.atomic.AtomicLong; */ public class MapCache implements Cache { - /** - * An interface to limit the operations that can be done on a Cache so that it is easier to reason about what - * is actually going to be done. - */ - public interface Cache - { - public byte[] get(byte[] key); - public void put(byte[] key, byte[] value); - public void close(); - } - private final Map baseMap; private final ByteCountingLRUMap byteCountingLRUMap; - private final Map cacheCache; + private final Map namespaceId; private final AtomicInteger ids; private final Object clearLock = new Object(); @@ -75,7 +63,7 @@ public class MapCache implements Cache this.baseMap = Collections.synchronizedMap(byteCountingLRUMap); - cacheCache = Maps.newHashMap(); + namespaceId = Maps.newHashMap(); ids = new AtomicInteger(); } @@ -95,22 +83,29 @@ public class MapCache implements Cache @Override public byte[] get(NamedKey key) { - return provideCache(key.namespace).get(key.key); + final byte[] retVal = baseMap.get(computeKey(getNamespaceId(key.namespace), key.key)); + if (retVal == null) { + missCount.incrementAndGet(); + } else { + hitCount.incrementAndGet(); + } + return retVal; } @Override public void put(NamedKey key, byte[] value) { - provideCache(key.namespace).put(key.key, value); + synchronized (clearLock) { + baseMap.put(computeKey(getNamespaceId(key.namespace), key.key), value); + } } @Override public Map getBulk(Iterable keys) { Map retVal = Maps.newHashMap(); - for(NamedKey key : keys) { - retVal.put(key, provideCache(key.namespace).get(key.key)); + retVal.put(key, get(key)); } return retVal; } @@ -118,86 +113,46 @@ public class MapCache implements Cache @Override public void close(String namespace) { - provideCache(namespace).close(); - } + byte[] idBytes; + synchronized (namespaceId) { + idBytes = getNamespaceId(namespace); + if(idBytes == null) return; - private Cache provideCache(final String identifier) - { - synchronized (cacheCache) { - final Cache cachedCache = cacheCache.get(identifier); - if (cachedCache != null) { - return cachedCache; + namespaceId.remove(namespace); + } + synchronized (clearLock) { + Iterator iter = baseMap.keySet().iterator(); + while (iter.hasNext()) { + ByteBuffer next = iter.next(); + + if (next.get(0) == idBytes[0] + && next.get(1) == idBytes[1] + && next.get(2) == idBytes[2] + && next.get(3) == idBytes[3]) { + iter.remove(); + } } - - final byte[] myIdBytes = Ints.toByteArray(ids.getAndIncrement()); - - final Cache theCache = new Cache() - { - volatile boolean open = true; - - @Override - public byte[] get(byte[] key) - { - if (open) { - final byte[] retVal = baseMap.get(computeKey(key)); - if (retVal == null) { - missCount.incrementAndGet(); - } else { - hitCount.incrementAndGet(); - } - return retVal; - } - throw new ISE("Cache for namespace[%s] is closed.", identifier); - } - - @Override - public void put(byte[] key, byte[] value) - { - synchronized (clearLock) { - if (open) { - baseMap.put(computeKey(key), value); - return; - } - } - throw new ISE("Cache for namespace[%s] is closed.", identifier); - } - - @Override - public void close() - { - synchronized (cacheCache) { - cacheCache.remove(identifier); - } - synchronized (clearLock) { - if (open) { - open = false; - - Iterator iter = baseMap.keySet().iterator(); - while (iter.hasNext()) { - ByteBuffer next = iter.next(); - - if (next.get(0) == myIdBytes[0] - && next.get(1) == myIdBytes[1] - && next.get(2) == myIdBytes[2] - && next.get(3) == myIdBytes[3]) { - iter.remove(); - } - } - } - } - } - - private ByteBuffer computeKey(byte[] key) - { - final ByteBuffer retVal = ByteBuffer.allocate(key.length + 4).put(myIdBytes).put(key); - retVal.rewind(); - return retVal; - } - }; - - cacheCache.put(identifier, theCache); - - return theCache; } } + + private byte[] getNamespaceId(final String identifier) + { + synchronized (namespaceId) { + byte[] idBytes = namespaceId.get(identifier); + if (idBytes != null) { + return idBytes; + } + + idBytes = Ints.toByteArray(ids.getAndIncrement()); + namespaceId.put(identifier, idBytes); + return idBytes; + } + } + + private ByteBuffer computeKey(byte[] idBytes, byte[] key) + { + final ByteBuffer retVal = ByteBuffer.allocate(key.length + 4).put(idBytes).put(key); + retVal.rewind(); + return retVal; + } }