mirror of https://github.com/apache/druid.git
include actual key in value to detect improbable hash collisions
This commit is contained in:
parent
d5cf7cfdb3
commit
3aef020fe0
|
@ -19,8 +19,11 @@
|
|||
|
||||
package com.metamx.druid.client.cache;
|
||||
|
||||
import com.google.common.base.Charsets;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.primitives.Ints;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.Arrays;
|
||||
import java.util.Map;
|
||||
|
||||
|
@ -48,6 +51,14 @@ public interface Cache
|
|||
this.key = key;
|
||||
}
|
||||
|
||||
public byte[] toByteArray() {
|
||||
final byte[] nsBytes = this.namespace.getBytes(Charsets.UTF_8);
|
||||
return ByteBuffer.allocate(Ints.BYTES + nsBytes.length + this.key.length)
|
||||
.putInt(nsBytes.length)
|
||||
.put(nsBytes)
|
||||
.put(this.key).array();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o)
|
||||
{
|
||||
|
|
|
@ -23,6 +23,7 @@ import com.google.common.base.Function;
|
|||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.base.Throwables;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.common.primitives.Ints;
|
||||
import net.spy.memcached.AddrUtil;
|
||||
import net.spy.memcached.ConnectionFactoryBuilder;
|
||||
import net.spy.memcached.DefaultHashAlgorithm;
|
||||
|
@ -35,6 +36,8 @@ import org.apache.commons.codec.digest.DigestUtils;
|
|||
|
||||
import javax.annotation.Nullable;
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.Arrays;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.Future;
|
||||
|
@ -109,7 +112,7 @@ public class MemcachedCache implements Cache
|
|||
@Override
|
||||
public byte[] get(NamedKey key)
|
||||
{
|
||||
Future<Object> future = client.asyncGet(computeKeyString(memcachedPrefix, key));
|
||||
Future<Object> future = client.asyncGet(computeKeyHash(memcachedPrefix, key));
|
||||
try {
|
||||
byte[] bytes = (byte[]) future.get(timeout, TimeUnit.MILLISECONDS);
|
||||
if(bytes != null) {
|
||||
|
@ -118,7 +121,7 @@ public class MemcachedCache implements Cache
|
|||
else {
|
||||
missCount.incrementAndGet();
|
||||
}
|
||||
return bytes;
|
||||
return bytes == null ? null : deserializeValue(key, bytes);
|
||||
}
|
||||
catch(TimeoutException e) {
|
||||
timeoutCount.incrementAndGet();
|
||||
|
@ -137,7 +140,30 @@ public class MemcachedCache implements Cache
|
|||
@Override
|
||||
public void put(NamedKey key, byte[] value)
|
||||
{
|
||||
client.set(computeKeyString(memcachedPrefix, key), expiration, value);
|
||||
client.set(computeKeyHash(memcachedPrefix, key), expiration, serializeValue(key, value));
|
||||
}
|
||||
|
||||
private static byte[] serializeValue(NamedKey key, byte[] value) {
|
||||
byte[] keyBytes = key.toByteArray();
|
||||
return ByteBuffer.allocate(Ints.BYTES + keyBytes.length + value.length)
|
||||
.putInt(keyBytes.length)
|
||||
.put(keyBytes)
|
||||
.put(value)
|
||||
.array();
|
||||
}
|
||||
|
||||
private static byte[] deserializeValue(NamedKey key, byte[] bytes) {
|
||||
ByteBuffer buf = ByteBuffer.wrap(bytes);
|
||||
|
||||
final int keyLength = buf.getInt();
|
||||
byte[] keyBytes = new byte[keyLength];
|
||||
buf.get(keyBytes);
|
||||
byte[] value = new byte[buf.remaining()];
|
||||
buf.get(value);
|
||||
|
||||
Preconditions.checkState(Arrays.equals(keyBytes, key.toByteArray()),
|
||||
"Keys do not match, possible hash collision?");
|
||||
return value;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -152,7 +178,7 @@ public class MemcachedCache implements Cache
|
|||
@Nullable NamedKey input
|
||||
)
|
||||
{
|
||||
return computeKeyString(memcachedPrefix, input);
|
||||
return computeKeyHash(memcachedPrefix, input);
|
||||
}
|
||||
}
|
||||
);
|
||||
|
@ -171,9 +197,11 @@ public class MemcachedCache implements Cache
|
|||
|
||||
Map<NamedKey, byte[]> results = Maps.newHashMap();
|
||||
for(Map.Entry<String, Object> entry : some.entrySet()) {
|
||||
final NamedKey key = keyLookup.get(entry.getKey());
|
||||
final byte[] value = (byte[]) entry.getValue();
|
||||
results.put(
|
||||
keyLookup.get(entry.getKey()),
|
||||
(byte[])entry.getValue()
|
||||
key,
|
||||
value == null ? null : deserializeValue(key, value)
|
||||
);
|
||||
}
|
||||
|
||||
|
@ -201,7 +229,7 @@ public class MemcachedCache implements Cache
|
|||
- 2 // length of separators
|
||||
;
|
||||
|
||||
private static String computeKeyString(String memcachedPrefix, NamedKey key) {
|
||||
private static String computeKeyHash(String memcachedPrefix, NamedKey key) {
|
||||
// hash keys to keep things under 250 characters for memcached
|
||||
return memcachedPrefix + ":" + DigestUtils.sha1Hex(key.namespace) + ":" + DigestUtils.sha1Hex(key.key);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue