Merge branch 'task-stuff' of github.com:metamx/druid into task-stuff

This commit is contained in:
Fangjin Yang 2013-02-01 12:59:04 -08:00
commit d252059e4a
7 changed files with 115 additions and 44 deletions

View File

@ -19,8 +19,11 @@
package com.metamx.druid.client.cache; package com.metamx.druid.client.cache;
import com.google.common.base.Charsets;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.primitives.Ints;
import java.nio.ByteBuffer;
import java.util.Arrays; import java.util.Arrays;
import java.util.Map; import java.util.Map;
@ -48,6 +51,14 @@ public interface Cache
this.key = key; this.key = key;
} }
public byte[] toByteArray() {
final byte[] nsBytes = this.namespace.getBytes(Charsets.UTF_8);
return ByteBuffer.allocate(Ints.BYTES + nsBytes.length + this.key.length)
.putInt(nsBytes.length)
.put(nsBytes)
.put(this.key).array();
}
@Override @Override
public boolean equals(Object o) public boolean equals(Object o)
{ {

View File

@ -20,9 +20,10 @@
package com.metamx.druid.client.cache; package com.metamx.druid.client.cache;
import com.google.common.base.Function; import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables; import com.google.common.base.Throwables;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import net.iharder.base64.Base64; import com.google.common.primitives.Ints;
import net.spy.memcached.AddrUtil; import net.spy.memcached.AddrUtil;
import net.spy.memcached.ConnectionFactoryBuilder; import net.spy.memcached.ConnectionFactoryBuilder;
import net.spy.memcached.DefaultHashAlgorithm; import net.spy.memcached.DefaultHashAlgorithm;
@ -31,9 +32,12 @@ import net.spy.memcached.MemcachedClient;
import net.spy.memcached.MemcachedClientIF; import net.spy.memcached.MemcachedClientIF;
import net.spy.memcached.internal.BulkFuture; import net.spy.memcached.internal.BulkFuture;
import net.spy.memcached.transcoders.SerializingTranscoder; import net.spy.memcached.transcoders.SerializingTranscoder;
import org.apache.commons.codec.digest.DigestUtils;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future; import java.util.concurrent.Future;
@ -62,6 +66,7 @@ public class MemcachedCache implements Cache
.build(), .build(),
AddrUtil.getAddresses(config.getHosts()) AddrUtil.getAddresses(config.getHosts())
), ),
config.getMemcachedPrefix(),
config.getTimeout(), config.getTimeout(),
config.getExpiration() config.getExpiration()
); );
@ -72,6 +77,7 @@ public class MemcachedCache implements Cache
private final int timeout; private final int timeout;
private final int expiration; private final int expiration;
private final String memcachedPrefix;
private final MemcachedClientIF client; private final MemcachedClientIF client;
@ -79,10 +85,15 @@ public class MemcachedCache implements Cache
private final AtomicLong missCount = new AtomicLong(0); private final AtomicLong missCount = new AtomicLong(0);
private final AtomicLong timeoutCount = new AtomicLong(0); private final AtomicLong timeoutCount = new AtomicLong(0);
MemcachedCache(MemcachedClientIF client, int timeout, int expiration) { MemcachedCache(MemcachedClientIF client, String memcachedPrefix, int timeout, int expiration) {
Preconditions.checkArgument(memcachedPrefix.length() <= MAX_PREFIX_LENGTH,
"memcachedPrefix length [%d] exceeds maximum length [%d]",
memcachedPrefix.length(),
MAX_PREFIX_LENGTH);
this.timeout = timeout; this.timeout = timeout;
this.expiration = expiration; this.expiration = expiration;
this.client = client; this.client = client;
this.memcachedPrefix = memcachedPrefix;
} }
@Override @Override
@ -101,7 +112,7 @@ public class MemcachedCache implements Cache
@Override @Override
public byte[] get(NamedKey key) public byte[] get(NamedKey key)
{ {
Future<Object> future = client.asyncGet(computeKeyString(key)); Future<Object> future = client.asyncGet(computeKeyHash(memcachedPrefix, key));
try { try {
byte[] bytes = (byte[]) future.get(timeout, TimeUnit.MILLISECONDS); byte[] bytes = (byte[]) future.get(timeout, TimeUnit.MILLISECONDS);
if(bytes != null) { if(bytes != null) {
@ -110,7 +121,7 @@ public class MemcachedCache implements Cache
else { else {
missCount.incrementAndGet(); missCount.incrementAndGet();
} }
return bytes; return bytes == null ? null : deserializeValue(key, bytes);
} }
catch(TimeoutException e) { catch(TimeoutException e) {
timeoutCount.incrementAndGet(); timeoutCount.incrementAndGet();
@ -129,7 +140,30 @@ public class MemcachedCache implements Cache
@Override @Override
public void put(NamedKey key, byte[] value) public void put(NamedKey key, byte[] value)
{ {
client.set(computeKeyString(key), expiration, value); client.set(computeKeyHash(memcachedPrefix, key), expiration, serializeValue(key, value));
}
private static byte[] serializeValue(NamedKey key, byte[] value) {
byte[] keyBytes = key.toByteArray();
return ByteBuffer.allocate(Ints.BYTES + keyBytes.length + value.length)
.putInt(keyBytes.length)
.put(keyBytes)
.put(value)
.array();
}
private static byte[] deserializeValue(NamedKey key, byte[] bytes) {
ByteBuffer buf = ByteBuffer.wrap(bytes);
final int keyLength = buf.getInt();
byte[] keyBytes = new byte[keyLength];
buf.get(keyBytes);
byte[] value = new byte[buf.remaining()];
buf.get(value);
Preconditions.checkState(Arrays.equals(keyBytes, key.toByteArray()),
"Keys do not match, possible hash collision?");
return value;
} }
@Override @Override
@ -144,7 +178,7 @@ public class MemcachedCache implements Cache
@Nullable NamedKey input @Nullable NamedKey input
) )
{ {
return computeKeyString(input); return computeKeyHash(memcachedPrefix, input);
} }
} }
); );
@ -163,9 +197,11 @@ public class MemcachedCache implements Cache
Map<NamedKey, byte[]> results = Maps.newHashMap(); Map<NamedKey, byte[]> results = Maps.newHashMap();
for(Map.Entry<String, Object> entry : some.entrySet()) { for(Map.Entry<String, Object> entry : some.entrySet()) {
final NamedKey key = keyLookup.get(entry.getKey());
final byte[] value = (byte[]) entry.getValue();
results.put( results.put(
keyLookup.get(entry.getKey()), key,
(byte[])entry.getValue() value == null ? null : deserializeValue(key, value)
); );
} }
@ -186,7 +222,15 @@ public class MemcachedCache implements Cache
// no resources to cleanup // no resources to cleanup
} }
private static String computeKeyString(NamedKey key) { public static final int MAX_PREFIX_LENGTH =
return key.namespace + ":" + Base64.encodeBytes(key.key, Base64.DONT_BREAK_LINES); MemcachedClientIF.MAX_KEY_LENGTH
- 40 // length of namespace hash
- 40 // length of key hash
- 2 // length of separators
;
private static String computeKeyHash(String memcachedPrefix, NamedKey key) {
// hash keys to keep things under 250 characters for memcached
return memcachedPrefix + ":" + DigestUtils.sha1Hex(key.namespace) + ":" + DigestUtils.sha1Hex(key.key);
} }
} }

View File

@ -18,4 +18,7 @@ public abstract class MemcachedCacheConfig
@Config("${prefix}.maxObjectSize") @Config("${prefix}.maxObjectSize")
public abstract int getMaxObjectSize(); public abstract int getMaxObjectSize();
@Config("${prefix}.memcachedPrefix")
public abstract String getMemcachedPrefix();
} }

View File

@ -26,7 +26,7 @@ import org.junit.Test;
/** /**
*/ */
public class MapCacheBrokerTest public class MapCacheTest
{ {
private static final byte[] HI = "hi".getBytes(); private static final byte[] HI = "hi".getBytes();
private static final byte[] HO = "ho".getBytes(); private static final byte[] HO = "ho".getBytes();

View File

@ -17,7 +17,7 @@ import java.util.Map;
import java.util.Random; import java.util.Random;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
public class MemcachedCacheBrokerBenchmark extends SimpleBenchmark public class MemcachedCacheBenchmark extends SimpleBenchmark
{ {
private static final String BASE_KEY = "test_2012-11-26T00:00:00.000Z_2012-11-27T00:00:00.000Z_2012-11-27T04:11:25.979Z_"; private static final String BASE_KEY = "test_2012-11-26T00:00:00.000Z_2012-11-27T00:00:00.000Z_2012-11-27T04:11:25.979Z_";
public static final String NAMESPACE = "default"; public static final String NAMESPACE = "default";
@ -56,6 +56,7 @@ public class MemcachedCacheBrokerBenchmark extends SimpleBenchmark
cache = new MemcachedCache( cache = new MemcachedCache(
client, client,
"druid-memcached-benchmark",
30000, // 30 seconds 30000, // 30 seconds
3600 // 1 hour 3600 // 1 hour
); );
@ -113,6 +114,6 @@ public class MemcachedCacheBrokerBenchmark extends SimpleBenchmark
} }
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
Runner.main(MemcachedCacheBrokerBenchmark.class, args); Runner.main(MemcachedCacheBenchmark.class, args);
} }
} }

View File

@ -50,7 +50,7 @@ import java.util.concurrent.TimeoutException;
/** /**
*/ */
public class MemcachedCacheBrokerTest public class MemcachedCacheTest
{ {
private static final byte[] HI = "hi".getBytes(); private static final byte[] HI = "hi".getBytes();
private static final byte[] HO = "ho".getBytes(); private static final byte[] HO = "ho".getBytes();
@ -60,7 +60,7 @@ public class MemcachedCacheBrokerTest
public void setUp() throws Exception public void setUp() throws Exception
{ {
MemcachedClientIF client = new MockMemcachedClient(); MemcachedClientIF client = new MockMemcachedClient();
cache = new MemcachedCache(client, 500, 3600); cache = new MemcachedCache(client, "druid-memcached-test", 500, 3600);
} }
@Test @Test

View File

@ -73,7 +73,6 @@ public class TaskQueue
{ {
private final List<Task> queue = Lists.newLinkedList(); private final List<Task> queue = Lists.newLinkedList();
private final Map<String, NavigableMap<Interval, TaskGroup>> running = Maps.newHashMap(); private final Map<String, NavigableMap<Interval, TaskGroup>> running = Maps.newHashMap();
private final Multimap<String, String> seenNextTasks = HashMultimap.create();
private final TaskStorage taskStorage; private final TaskStorage taskStorage;
@ -199,6 +198,11 @@ public class TaskQueue
queue.add(task); queue.add(task);
workMayBeAvailable.signalAll(); workMayBeAvailable.signalAll();
// Attempt to add this task to a running task group. Silently continue if this is not possible.
// The main reason this is here is so when subtasks are added, they end up in the same task group
// as their parent whenever possible.
tryLock(task);
return true; return true;
} }
finally { finally {
@ -274,26 +278,26 @@ public class TaskQueue
* Finally, if this task is not supposed to be running, this method will simply do nothing. * Finally, if this task is not supposed to be running, this method will simply do nothing.
* *
* @param task task to update * @param task task to update
* @param status new task status * @param originalStatus new task status
* @param commitRunnable operation to perform if this task is ready to commit * @param commitRunnable operation to perform if this task is ready to commit
* *
* @throws NullPointerException if task or status is null * @throws NullPointerException if task or status is null
* @throws IllegalArgumentException if the task ID does not match the status ID * @throws IllegalArgumentException if the task ID does not match the status ID
* @throws IllegalStateException if this queue is currently shut down * @throws IllegalStateException if this queue is currently shut down
*/ */
public void notify(final Task task, final TaskStatus status, final Runnable commitRunnable) public void notify(final Task task, final TaskStatus originalStatus, final Runnable commitRunnable)
{ {
giant.lock(); giant.lock();
try { try {
Preconditions.checkNotNull(task, "task"); Preconditions.checkNotNull(task, "task");
Preconditions.checkNotNull(status, "status"); Preconditions.checkNotNull(originalStatus, "status");
Preconditions.checkState(active, "Queue is not active!"); Preconditions.checkState(active, "Queue is not active!");
Preconditions.checkArgument( Preconditions.checkArgument(
task.getId().equals(status.getId()), task.getId().equals(originalStatus.getId()),
"Mismatching task ids[%s/%s]", "Mismatching task ids[%s/%s]",
task.getId(), task.getId(),
status.getId() originalStatus.getId()
); );
final TaskGroup taskGroup; final TaskGroup taskGroup;
@ -306,20 +310,13 @@ public class TaskQueue
taskGroup = maybeTaskGroup.get(); taskGroup = maybeTaskGroup.get();
} }
// Update status in DB // This is what we want to write to the DB when we're done.
// TODO: We can either do this first, in which case we run the risk of having a task marked done in the DB but // Not final, since we might need to reassign the var later if the commitRunnable fails.
// TODO: not committed here; or we can do it last, in which case we run the risk of having a task marked running TaskStatus statusToSave = originalStatus;
// TODO: in the DB but committed here. Currently, we err on the former side because we don't want a ticking time
// TODO: bomb in the DB (a task marked running that we have forgotten about, which will potentially be re-
// TODO: started much later when a coordinator bootstraps).
// TODO:
// TODO: Eventually we should have this status update enter a retry queue instead of throwing an exception
// TODO: if it fails.
taskStorage.setStatus(task.getId(), status);
// Should we commit? // Should we commit?
if (taskGroup.getCommitStyle().shouldCommit(task, status)) { if (taskGroup.getCommitStyle().shouldCommit(task, statusToSave)) {
log.info("Committing %s status for task: %s", status.getStatusCode(), task.getId()); log.info("Committing %s status for task: %s", statusToSave.getStatusCode(), task.getId());
// Add next tasks // Add next tasks
try { try {
@ -330,12 +327,10 @@ public class TaskQueue
// We want to allow tasks to submit RUNNING statuses with the same nextTasks over and over. // We want to allow tasks to submit RUNNING statuses with the same nextTasks over and over.
// So, we need to remember which ones we've already spawned and not do them again. // So, we need to remember which ones we've already spawned and not do them again.
for (final Task nextTask : status.getNextTasks()) { for (final Task nextTask : statusToSave.getNextTasks()) {
if (!seenNextTasks.containsEntry(task.getId(), nextTask.getId())) { try {
add(nextTask); add(nextTask);
tryLock(nextTask); } catch (TaskExistsException e) {
seenNextTasks.put(task.getId(), nextTask.getId());
} else {
log.info("Already added followup task %s to original task: %s", nextTask.getId(), task.getId()); log.info("Already added followup task %s to original task: %s", nextTask.getId(), task.getId());
} }
} }
@ -343,19 +338,36 @@ public class TaskQueue
catch (Exception e) { catch (Exception e) {
log.makeAlert(e, "Failed to commit task") log.makeAlert(e, "Failed to commit task")
.addData("task", task.getId()) .addData("task", task.getId())
.addData("statusCode", status.getStatusCode()) .addData("statusCode", statusToSave.getStatusCode())
.emit(); .emit();
// TODO -- If this fails, it should enter a retry queue instead of throwing an exception // Rewrite status
taskStorage.setStatus(task.getId(), TaskStatus.failure(task.getId()).withDuration(status.getDuration())); statusToSave = TaskStatus.failure(task.getId()).withDuration(statusToSave.getDuration());
} }
} else { } else {
log.info("Not committing %s status for task: %s", status.getStatusCode(), task); log.info("Not committing %s status for task: %s", statusToSave.getStatusCode(), task);
} }
if (status.isComplete()) { boolean didSetStatus = false;
try {
taskStorage.setStatus(task.getId(), statusToSave);
didSetStatus = true;
} catch(Exception e) {
// TODO: This could be a task-status-submission retry queue instead of retrying the entire task,
// TODO: which is heavy and probably not necessary.
log.warn(e, "Status could not be persisted! Reinserting task: %s", task.getId());
log.makeAlert(e, "Failed to persist task status")
.addData("task", task.getId())
.addData("statusCode", statusToSave.getStatusCode())
.emit();
queue.add(task);
}
if(didSetStatus && statusToSave.isComplete()) {
unlock(task); unlock(task);
seenNextTasks.removeAll(task.getId());
log.info("Task done: %s", task); log.info("Task done: %s", task);
} }
} }