From 9179718db5513b4742893360d65e214c0ede42c9 Mon Sep 17 00:00:00 2001 From: xvrl Date: Thu, 31 Jan 2013 14:53:50 -0800 Subject: [PATCH 1/4] hash cache keys to stay under memcached limit of 250 characters --- .../druid/client/cache/MemcachedCache.java | 30 ++++++++++++++----- .../client/cache/MemcachedCacheConfig.java | 3 ++ .../cache/MemcachedCacheBrokerBenchmark.java | 1 + .../cache/MemcachedCacheBrokerTest.java | 2 +- 4 files changed, 28 insertions(+), 8 deletions(-) diff --git a/client/src/main/java/com/metamx/druid/client/cache/MemcachedCache.java b/client/src/main/java/com/metamx/druid/client/cache/MemcachedCache.java index 86cbf5153e5..d8cd5e5ae8e 100644 --- a/client/src/main/java/com/metamx/druid/client/cache/MemcachedCache.java +++ b/client/src/main/java/com/metamx/druid/client/cache/MemcachedCache.java @@ -20,9 +20,9 @@ package com.metamx.druid.client.cache; import com.google.common.base.Function; +import com.google.common.base.Preconditions; import com.google.common.base.Throwables; import com.google.common.collect.Maps; -import net.iharder.base64.Base64; import net.spy.memcached.AddrUtil; import net.spy.memcached.ConnectionFactoryBuilder; import net.spy.memcached.DefaultHashAlgorithm; @@ -31,6 +31,7 @@ import net.spy.memcached.MemcachedClient; import net.spy.memcached.MemcachedClientIF; import net.spy.memcached.internal.BulkFuture; import net.spy.memcached.transcoders.SerializingTranscoder; +import org.apache.commons.codec.digest.DigestUtils; import javax.annotation.Nullable; import java.io.IOException; @@ -62,6 +63,7 @@ public class MemcachedCache implements Cache .build(), AddrUtil.getAddresses(config.getHosts()) ), + config.getMemcachedPrefix(), config.getTimeout(), config.getExpiration() ); @@ -72,6 +74,7 @@ public class MemcachedCache implements Cache private final int timeout; private final int expiration; + private final String memcachedPrefix; private final MemcachedClientIF client; @@ -79,10 +82,15 @@ public class MemcachedCache implements Cache private final AtomicLong missCount = new AtomicLong(0); private final AtomicLong timeoutCount = new AtomicLong(0); - MemcachedCache(MemcachedClientIF client, int timeout, int expiration) { + MemcachedCache(MemcachedClientIF client, String memcachedPrefix, int timeout, int expiration) { + Preconditions.checkArgument(memcachedPrefix.length() <= MAX_PREFIX_LENGTH, + "memcachedPrefix length [%d] exceeds maximum length [%d]", + memcachedPrefix.length(), + MAX_PREFIX_LENGTH); this.timeout = timeout; this.expiration = expiration; this.client = client; + this.memcachedPrefix = memcachedPrefix; } @Override @@ -101,7 +109,7 @@ public class MemcachedCache implements Cache @Override public byte[] get(NamedKey key) { - Future future = client.asyncGet(computeKeyString(key)); + Future future = client.asyncGet(computeKeyString(memcachedPrefix, key)); try { byte[] bytes = (byte[]) future.get(timeout, TimeUnit.MILLISECONDS); if(bytes != null) { @@ -129,7 +137,7 @@ public class MemcachedCache implements Cache @Override public void put(NamedKey key, byte[] value) { - client.set(computeKeyString(key), expiration, value); + client.set(computeKeyString(memcachedPrefix, key), expiration, value); } @Override @@ -144,7 +152,7 @@ public class MemcachedCache implements Cache @Nullable NamedKey input ) { - return computeKeyString(input); + return computeKeyString(memcachedPrefix, input); } } ); @@ -186,7 +194,15 @@ public class MemcachedCache implements Cache // no resources to cleanup } - private static String computeKeyString(NamedKey key) { - return key.namespace + ":" + Base64.encodeBytes(key.key, Base64.DONT_BREAK_LINES); + public static final int MAX_PREFIX_LENGTH = + MemcachedClientIF.MAX_KEY_LENGTH + - 40 // length of namespace hash + - 40 // length of key hash + - 2 // length of separators + ; + + private static String computeKeyString(String memcachedPrefix, NamedKey key) { + // hash keys to keep things under 250 characters for memcached + return memcachedPrefix + ":" + DigestUtils.sha1Hex(key.namespace) + ":" + DigestUtils.sha1Hex(key.key); } } diff --git a/client/src/main/java/com/metamx/druid/client/cache/MemcachedCacheConfig.java b/client/src/main/java/com/metamx/druid/client/cache/MemcachedCacheConfig.java index 83f626d8641..c2a277fb72f 100644 --- a/client/src/main/java/com/metamx/druid/client/cache/MemcachedCacheConfig.java +++ b/client/src/main/java/com/metamx/druid/client/cache/MemcachedCacheConfig.java @@ -18,4 +18,7 @@ public abstract class MemcachedCacheConfig @Config("${prefix}.maxObjectSize") public abstract int getMaxObjectSize(); + + @Config("${prefix}.memcachedPrefix") + public abstract String getMemcachedPrefix(); } diff --git a/client/src/test/java/com/metamx/druid/client/cache/MemcachedCacheBrokerBenchmark.java b/client/src/test/java/com/metamx/druid/client/cache/MemcachedCacheBrokerBenchmark.java index d87dfd5f7a1..7827e0779b0 100644 --- a/client/src/test/java/com/metamx/druid/client/cache/MemcachedCacheBrokerBenchmark.java +++ b/client/src/test/java/com/metamx/druid/client/cache/MemcachedCacheBrokerBenchmark.java @@ -56,6 +56,7 @@ public class MemcachedCacheBrokerBenchmark extends SimpleBenchmark cache = new MemcachedCache( client, + "druid-memcached-benchmark", 30000, // 30 seconds 3600 // 1 hour ); diff --git a/client/src/test/java/com/metamx/druid/client/cache/MemcachedCacheBrokerTest.java b/client/src/test/java/com/metamx/druid/client/cache/MemcachedCacheBrokerTest.java index 87c1dcdd9f4..a87441d5b36 100644 --- a/client/src/test/java/com/metamx/druid/client/cache/MemcachedCacheBrokerTest.java +++ b/client/src/test/java/com/metamx/druid/client/cache/MemcachedCacheBrokerTest.java @@ -60,7 +60,7 @@ public class MemcachedCacheBrokerTest public void setUp() throws Exception { MemcachedClientIF client = new MockMemcachedClient(); - cache = new MemcachedCache(client, 500, 3600); + cache = new MemcachedCache(client, "druid-memcached-test", 500, 3600); } @Test From d5cf7cfdb3fa2065c9855a994fde1d620a547c06 Mon Sep 17 00:00:00 2001 From: xvrl Date: Thu, 31 Jan 2013 14:55:02 -0800 Subject: [PATCH 2/4] remove references to 'broker' --- .../cache/{MapCacheBrokerTest.java => MapCacheTest.java} | 2 +- ...CacheBrokerBenchmark.java => MemcachedCacheBenchmark.java} | 4 ++-- ...{MemcachedCacheBrokerTest.java => MemcachedCacheTest.java} | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) rename client/src/test/java/com/metamx/druid/client/cache/{MapCacheBrokerTest.java => MapCacheTest.java} (98%) rename client/src/test/java/com/metamx/druid/client/cache/{MemcachedCacheBrokerBenchmark.java => MemcachedCacheBenchmark.java} (96%) rename client/src/test/java/com/metamx/druid/client/cache/{MemcachedCacheBrokerTest.java => MemcachedCacheTest.java} (99%) diff --git a/client/src/test/java/com/metamx/druid/client/cache/MapCacheBrokerTest.java b/client/src/test/java/com/metamx/druid/client/cache/MapCacheTest.java similarity index 98% rename from client/src/test/java/com/metamx/druid/client/cache/MapCacheBrokerTest.java rename to client/src/test/java/com/metamx/druid/client/cache/MapCacheTest.java index 78f071ca539..23a3bd1d641 100644 --- a/client/src/test/java/com/metamx/druid/client/cache/MapCacheBrokerTest.java +++ b/client/src/test/java/com/metamx/druid/client/cache/MapCacheTest.java @@ -26,7 +26,7 @@ import org.junit.Test; /** */ -public class MapCacheBrokerTest +public class MapCacheTest { private static final byte[] HI = "hi".getBytes(); private static final byte[] HO = "ho".getBytes(); diff --git a/client/src/test/java/com/metamx/druid/client/cache/MemcachedCacheBrokerBenchmark.java b/client/src/test/java/com/metamx/druid/client/cache/MemcachedCacheBenchmark.java similarity index 96% rename from client/src/test/java/com/metamx/druid/client/cache/MemcachedCacheBrokerBenchmark.java rename to client/src/test/java/com/metamx/druid/client/cache/MemcachedCacheBenchmark.java index 7827e0779b0..3a746c9484b 100644 --- a/client/src/test/java/com/metamx/druid/client/cache/MemcachedCacheBrokerBenchmark.java +++ b/client/src/test/java/com/metamx/druid/client/cache/MemcachedCacheBenchmark.java @@ -17,7 +17,7 @@ import java.util.Map; import java.util.Random; import java.util.concurrent.TimeUnit; -public class MemcachedCacheBrokerBenchmark extends SimpleBenchmark +public class MemcachedCacheBenchmark extends SimpleBenchmark { private static final String BASE_KEY = "test_2012-11-26T00:00:00.000Z_2012-11-27T00:00:00.000Z_2012-11-27T04:11:25.979Z_"; public static final String NAMESPACE = "default"; @@ -114,6 +114,6 @@ public class MemcachedCacheBrokerBenchmark extends SimpleBenchmark } public static void main(String[] args) throws Exception { - Runner.main(MemcachedCacheBrokerBenchmark.class, args); + Runner.main(MemcachedCacheBenchmark.class, args); } } diff --git a/client/src/test/java/com/metamx/druid/client/cache/MemcachedCacheBrokerTest.java b/client/src/test/java/com/metamx/druid/client/cache/MemcachedCacheTest.java similarity index 99% rename from client/src/test/java/com/metamx/druid/client/cache/MemcachedCacheBrokerTest.java rename to client/src/test/java/com/metamx/druid/client/cache/MemcachedCacheTest.java index a87441d5b36..287d208db62 100644 --- a/client/src/test/java/com/metamx/druid/client/cache/MemcachedCacheBrokerTest.java +++ b/client/src/test/java/com/metamx/druid/client/cache/MemcachedCacheTest.java @@ -50,7 +50,7 @@ import java.util.concurrent.TimeoutException; /** */ -public class MemcachedCacheBrokerTest +public class MemcachedCacheTest { private static final byte[] HI = "hi".getBytes(); private static final byte[] HO = "ho".getBytes(); From 3aef020fe0529da90d95e27fbc76082021fdab1f Mon Sep 17 00:00:00 2001 From: xvrl Date: Thu, 31 Jan 2013 15:54:52 -0800 Subject: [PATCH 3/4] include actual key in value to detect improbable hash collisions --- .../com/metamx/druid/client/cache/Cache.java | 11 +++++ .../druid/client/cache/MemcachedCache.java | 42 +++++++++++++++---- 2 files changed, 46 insertions(+), 7 deletions(-) diff --git a/client/src/main/java/com/metamx/druid/client/cache/Cache.java b/client/src/main/java/com/metamx/druid/client/cache/Cache.java index 9bf0cde33e7..6e9463deb56 100644 --- a/client/src/main/java/com/metamx/druid/client/cache/Cache.java +++ b/client/src/main/java/com/metamx/druid/client/cache/Cache.java @@ -19,8 +19,11 @@ package com.metamx.druid.client.cache; +import com.google.common.base.Charsets; import com.google.common.base.Preconditions; +import com.google.common.primitives.Ints; +import java.nio.ByteBuffer; import java.util.Arrays; import java.util.Map; @@ -48,6 +51,14 @@ public interface Cache this.key = key; } + public byte[] toByteArray() { + final byte[] nsBytes = this.namespace.getBytes(Charsets.UTF_8); + return ByteBuffer.allocate(Ints.BYTES + nsBytes.length + this.key.length) + .putInt(nsBytes.length) + .put(nsBytes) + .put(this.key).array(); + } + @Override public boolean equals(Object o) { diff --git a/client/src/main/java/com/metamx/druid/client/cache/MemcachedCache.java b/client/src/main/java/com/metamx/druid/client/cache/MemcachedCache.java index d8cd5e5ae8e..ed7f5292e8a 100644 --- a/client/src/main/java/com/metamx/druid/client/cache/MemcachedCache.java +++ b/client/src/main/java/com/metamx/druid/client/cache/MemcachedCache.java @@ -23,6 +23,7 @@ import com.google.common.base.Function; import com.google.common.base.Preconditions; import com.google.common.base.Throwables; import com.google.common.collect.Maps; +import com.google.common.primitives.Ints; import net.spy.memcached.AddrUtil; import net.spy.memcached.ConnectionFactoryBuilder; import net.spy.memcached.DefaultHashAlgorithm; @@ -35,6 +36,8 @@ import org.apache.commons.codec.digest.DigestUtils; import javax.annotation.Nullable; import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Arrays; import java.util.Map; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; @@ -109,7 +112,7 @@ public class MemcachedCache implements Cache @Override public byte[] get(NamedKey key) { - Future future = client.asyncGet(computeKeyString(memcachedPrefix, key)); + Future future = client.asyncGet(computeKeyHash(memcachedPrefix, key)); try { byte[] bytes = (byte[]) future.get(timeout, TimeUnit.MILLISECONDS); if(bytes != null) { @@ -118,7 +121,7 @@ public class MemcachedCache implements Cache else { missCount.incrementAndGet(); } - return bytes; + return bytes == null ? null : deserializeValue(key, bytes); } catch(TimeoutException e) { timeoutCount.incrementAndGet(); @@ -137,7 +140,30 @@ public class MemcachedCache implements Cache @Override public void put(NamedKey key, byte[] value) { - client.set(computeKeyString(memcachedPrefix, key), expiration, value); + client.set(computeKeyHash(memcachedPrefix, key), expiration, serializeValue(key, value)); + } + + private static byte[] serializeValue(NamedKey key, byte[] value) { + byte[] keyBytes = key.toByteArray(); + return ByteBuffer.allocate(Ints.BYTES + keyBytes.length + value.length) + .putInt(keyBytes.length) + .put(keyBytes) + .put(value) + .array(); + } + + private static byte[] deserializeValue(NamedKey key, byte[] bytes) { + ByteBuffer buf = ByteBuffer.wrap(bytes); + + final int keyLength = buf.getInt(); + byte[] keyBytes = new byte[keyLength]; + buf.get(keyBytes); + byte[] value = new byte[buf.remaining()]; + buf.get(value); + + Preconditions.checkState(Arrays.equals(keyBytes, key.toByteArray()), + "Keys do not match, possible hash collision?"); + return value; } @Override @@ -152,7 +178,7 @@ public class MemcachedCache implements Cache @Nullable NamedKey input ) { - return computeKeyString(memcachedPrefix, input); + return computeKeyHash(memcachedPrefix, input); } } ); @@ -171,9 +197,11 @@ public class MemcachedCache implements Cache Map results = Maps.newHashMap(); for(Map.Entry entry : some.entrySet()) { + final NamedKey key = keyLookup.get(entry.getKey()); + final byte[] value = (byte[]) entry.getValue(); results.put( - keyLookup.get(entry.getKey()), - (byte[])entry.getValue() + key, + value == null ? null : deserializeValue(key, value) ); } @@ -201,7 +229,7 @@ public class MemcachedCache implements Cache - 2 // length of separators ; - private static String computeKeyString(String memcachedPrefix, NamedKey key) { + private static String computeKeyHash(String memcachedPrefix, NamedKey key) { // hash keys to keep things under 250 characters for memcached return memcachedPrefix + ":" + DigestUtils.sha1Hex(key.namespace) + ":" + DigestUtils.sha1Hex(key.key); } From 393bec0539e3e747cc7b9ecc1950cbce6fa15491 Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Fri, 1 Feb 2013 12:16:06 -0800 Subject: [PATCH 4/4] TaskQueue: Behavior tweaks, simplification - Lock tasks on add if possible - Detect "already added" using exception test instead of bookkeeping map - Update task status after commitRunnable instead of before commitRunnable --- .../druid/merger/coordinator/TaskQueue.java | 70 +++++++++++-------- 1 file changed, 41 insertions(+), 29 deletions(-) diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/TaskQueue.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/TaskQueue.java index 6c7058475f1..e228b401025 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/TaskQueue.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/TaskQueue.java @@ -73,7 +73,6 @@ public class TaskQueue { private final List queue = Lists.newLinkedList(); private final Map> running = Maps.newHashMap(); - private final Multimap seenNextTasks = HashMultimap.create(); private final TaskStorage taskStorage; @@ -199,6 +198,11 @@ public class TaskQueue queue.add(task); workMayBeAvailable.signalAll(); + // Attempt to add this task to a running task group. Silently continue if this is not possible. + // The main reason this is here is so when subtasks are added, they end up in the same task group + // as their parent whenever possible. + tryLock(task); + return true; } finally { @@ -274,26 +278,26 @@ public class TaskQueue * Finally, if this task is not supposed to be running, this method will simply do nothing. * * @param task task to update - * @param status new task status + * @param originalStatus new task status * @param commitRunnable operation to perform if this task is ready to commit * * @throws NullPointerException if task or status is null * @throws IllegalArgumentException if the task ID does not match the status ID * @throws IllegalStateException if this queue is currently shut down */ - public void notify(final Task task, final TaskStatus status, final Runnable commitRunnable) + public void notify(final Task task, final TaskStatus originalStatus, final Runnable commitRunnable) { giant.lock(); try { Preconditions.checkNotNull(task, "task"); - Preconditions.checkNotNull(status, "status"); + Preconditions.checkNotNull(originalStatus, "status"); Preconditions.checkState(active, "Queue is not active!"); Preconditions.checkArgument( - task.getId().equals(status.getId()), + task.getId().equals(originalStatus.getId()), "Mismatching task ids[%s/%s]", task.getId(), - status.getId() + originalStatus.getId() ); final TaskGroup taskGroup; @@ -306,20 +310,13 @@ public class TaskQueue taskGroup = maybeTaskGroup.get(); } - // Update status in DB - // TODO: We can either do this first, in which case we run the risk of having a task marked done in the DB but - // TODO: not committed here; or we can do it last, in which case we run the risk of having a task marked running - // TODO: in the DB but committed here. Currently, we err on the former side because we don't want a ticking time - // TODO: bomb in the DB (a task marked running that we have forgotten about, which will potentially be re- - // TODO: started much later when a coordinator bootstraps). - // TODO: - // TODO: Eventually we should have this status update enter a retry queue instead of throwing an exception - // TODO: if it fails. - taskStorage.setStatus(task.getId(), status); + // This is what we want to write to the DB when we're done. + // Not final, since we might need to reassign the var later if the commitRunnable fails. + TaskStatus statusToSave = originalStatus; // Should we commit? - if (taskGroup.getCommitStyle().shouldCommit(task, status)) { - log.info("Committing %s status for task: %s", status.getStatusCode(), task.getId()); + if (taskGroup.getCommitStyle().shouldCommit(task, statusToSave)) { + log.info("Committing %s status for task: %s", statusToSave.getStatusCode(), task.getId()); // Add next tasks try { @@ -330,12 +327,10 @@ public class TaskQueue // We want to allow tasks to submit RUNNING statuses with the same nextTasks over and over. // So, we need to remember which ones we've already spawned and not do them again. - for (final Task nextTask : status.getNextTasks()) { - if (!seenNextTasks.containsEntry(task.getId(), nextTask.getId())) { + for (final Task nextTask : statusToSave.getNextTasks()) { + try { add(nextTask); - tryLock(nextTask); - seenNextTasks.put(task.getId(), nextTask.getId()); - } else { + } catch (TaskExistsException e) { log.info("Already added followup task %s to original task: %s", nextTask.getId(), task.getId()); } } @@ -343,19 +338,36 @@ public class TaskQueue catch (Exception e) { log.makeAlert(e, "Failed to commit task") .addData("task", task.getId()) - .addData("statusCode", status.getStatusCode()) + .addData("statusCode", statusToSave.getStatusCode()) .emit(); - // TODO -- If this fails, it should enter a retry queue instead of throwing an exception - taskStorage.setStatus(task.getId(), TaskStatus.failure(task.getId()).withDuration(status.getDuration())); + // Rewrite status + statusToSave = TaskStatus.failure(task.getId()).withDuration(statusToSave.getDuration()); } } else { - log.info("Not committing %s status for task: %s", status.getStatusCode(), task); + log.info("Not committing %s status for task: %s", statusToSave.getStatusCode(), task); } - if (status.isComplete()) { + boolean didSetStatus = false; + + try { + taskStorage.setStatus(task.getId(), statusToSave); + didSetStatus = true; + } catch(Exception e) { + // TODO: This could be a task-status-submission retry queue instead of retrying the entire task, + // TODO: which is heavy and probably not necessary. + log.warn(e, "Status could not be persisted! Reinserting task: %s", task.getId()); + + log.makeAlert(e, "Failed to persist task status") + .addData("task", task.getId()) + .addData("statusCode", statusToSave.getStatusCode()) + .emit(); + + queue.add(task); + } + + if(didSetStatus && statusToSave.isComplete()) { unlock(task); - seenNextTasks.removeAll(task.getId()); log.info("Task done: %s", task); } }