diff --git a/client/src/main/java/com/metamx/druid/client/indexing/IndexingServiceClient.java b/client/src/main/java/com/metamx/druid/client/indexing/IndexingServiceClient.java index e09a609a7bd..9ba070b5333 100644 --- a/client/src/main/java/com/metamx/druid/client/indexing/IndexingServiceClient.java +++ b/client/src/main/java/com/metamx/druid/client/indexing/IndexingServiceClient.java @@ -108,7 +108,7 @@ public class IndexingServiceClient throw new ISE("Cannot find instance of indexingService"); } - return String.format("http://%s:%s/mmx/merger/v1", instance.getAddress(), instance.getPort()); + return String.format("http://%s:%s/mmx/indexer/v1", instance.getAddress(), instance.getPort()); } catch (Exception e) { throw Throwables.propagate(e); diff --git a/indexing-common/src/main/java/com/metamx/druid/index/serde/SpatialIndexColumnPartSupplier.java b/indexing-common/src/main/java/com/metamx/druid/index/serde/SpatialIndexColumnPartSupplier.java index 660d1cb3561..246da53ab7b 100644 --- a/indexing-common/src/main/java/com/metamx/druid/index/serde/SpatialIndexColumnPartSupplier.java +++ b/indexing-common/src/main/java/com/metamx/druid/index/serde/SpatialIndexColumnPartSupplier.java @@ -27,15 +27,13 @@ import com.metamx.druid.kv.GenericIndexed; */ public class SpatialIndexColumnPartSupplier implements Supplier { - private static final ImmutableRTree EMPTY_SET = new ImmutableRTree(); - private final ImmutableRTree indexedTree; public SpatialIndexColumnPartSupplier( ImmutableRTree indexedTree ) { - this.indexedTree = (indexedTree == null) ? EMPTY_SET : indexedTree; + this.indexedTree = indexedTree; } @Override diff --git a/indexing-common/src/main/java/com/metamx/druid/index/v1/IndexIO.java b/indexing-common/src/main/java/com/metamx/druid/index/v1/IndexIO.java index d9dc25a7f23..24f6750785f 100644 --- a/indexing-common/src/main/java/com/metamx/druid/index/v1/IndexIO.java +++ b/indexing-common/src/main/java/com/metamx/druid/index/v1/IndexIO.java @@ -692,25 +692,29 @@ public class IndexIO Map columns = Maps.newHashMap(); for (String dimension : index.getAvailableDimensions()) { + ColumnBuilder builder = new ColumnBuilder() + .setType(ValueType.STRING) + .setHasMultipleValues(true) + .setDictionaryEncodedColumn( + new DictionaryEncodedColumnSupplier( + index.getDimValueLookup(dimension), null, (index.getDimColumn(dimension)) + ) + ) + .setBitmapIndex( + new BitmapIndexColumnPartSupplier( + index.getInvertedIndexes().get(dimension), index.getDimValueLookup(dimension) + ) + ); + if (index.getSpatialIndexes().get(dimension) != null) { + builder.setSpatialIndex( + new SpatialIndexColumnPartSupplier( + index.getSpatialIndexes().get(dimension) + ) + ); + } columns.put( dimension.toLowerCase(), - new ColumnBuilder() - .setType(ValueType.STRING) - .setHasMultipleValues(true) - .setDictionaryEncodedColumn( - new DictionaryEncodedColumnSupplier( - index.getDimValueLookup(dimension), null, (index.getDimColumn(dimension)) - ) - ) - .setBitmapIndex( - new BitmapIndexColumnPartSupplier( - index.getInvertedIndexes().get(dimension), index.getDimValueLookup(dimension) - ) - ).setSpatialIndex( - new SpatialIndexColumnPartSupplier( - index.getSpatialIndexes().get(dimension) - ) - ).build() + builder.build() ); } diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/common/task/AbstractTask.java b/indexing-service/src/main/java/com/metamx/druid/indexing/common/task/AbstractTask.java index 1afa9589002..d1f87243f51 100644 --- a/indexing-service/src/main/java/com/metamx/druid/indexing/common/task/AbstractTask.java +++ b/indexing-service/src/main/java/com/metamx/druid/indexing/common/task/AbstractTask.java @@ -42,6 +42,9 @@ public abstract class AbstractTask implements Task @JsonIgnore private final String groupId; + @JsonIgnore + private final String availabilityGroup; + @JsonIgnore private final String dataSource; @@ -50,13 +53,14 @@ public abstract class AbstractTask implements Task protected AbstractTask(String id, String dataSource, Interval interval) { - this(id, id, dataSource, interval); + this(id, id, id, dataSource, interval); } - protected AbstractTask(String id, String groupId, String dataSource, Interval interval) + protected AbstractTask(String id, String groupId, String availabilityGroup, String dataSource, Interval interval) { this.id = Preconditions.checkNotNull(id, "id"); this.groupId = Preconditions.checkNotNull(groupId, "groupId"); + this.availabilityGroup = Preconditions.checkNotNull(availabilityGroup, "availabilityGroup"); this.dataSource = Preconditions.checkNotNull(dataSource, "dataSource"); this.interval = Optional.fromNullable(interval); } @@ -75,6 +79,13 @@ public abstract class AbstractTask implements Task return groupId; } + @JsonProperty + @Override + public String getAvailabilityGroup() + { + return availabilityGroup; + } + @Override public String getNodeType() { diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/common/task/IndexDeterminePartitionsTask.java b/indexing-service/src/main/java/com/metamx/druid/indexing/common/task/IndexDeterminePartitionsTask.java index bf2c1424a6a..7c7d8707a37 100644 --- a/indexing-service/src/main/java/com/metamx/druid/indexing/common/task/IndexDeterminePartitionsTask.java +++ b/indexing-service/src/main/java/com/metamx/druid/indexing/common/task/IndexDeterminePartitionsTask.java @@ -41,6 +41,7 @@ import com.metamx.druid.realtime.Schema; import com.metamx.druid.shard.NoneShardSpec; import com.metamx.druid.shard.ShardSpec; import com.metamx.druid.shard.SingleDimensionShardSpec; +import org.joda.time.DateTime; import org.joda.time.Interval; import java.util.List; @@ -49,6 +50,16 @@ import java.util.Set; public class IndexDeterminePartitionsTask extends AbstractTask { + private static String makeTaskId(String groupId, DateTime start, DateTime end) + { + return String.format( + "%s_partitions_%s_%s", + groupId, + start, + end + ); + } + @JsonIgnore private final FirehoseFactory firehoseFactory; @@ -75,13 +86,9 @@ public class IndexDeterminePartitionsTask extends AbstractTask ) { super( - id != null ? id : String.format( - "%s_partitions_%s_%s", - groupId, - interval.getStart(), - interval.getEnd() - ), + id != null ? id : makeTaskId(groupId, interval.getStart(), interval.getEnd()), groupId, + makeTaskId(groupId, interval.getStart(), interval.getEnd()), schema.getDataSource(), Preconditions.checkNotNull(interval, "interval") ); diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/common/task/IndexGeneratorTask.java b/indexing-service/src/main/java/com/metamx/druid/indexing/common/task/IndexGeneratorTask.java index 607ac138dc9..b6adaa0bd8e 100644 --- a/indexing-service/src/main/java/com/metamx/druid/indexing/common/task/IndexGeneratorTask.java +++ b/indexing-service/src/main/java/com/metamx/druid/indexing/common/task/IndexGeneratorTask.java @@ -53,6 +53,17 @@ import java.util.concurrent.CopyOnWriteArrayList; public class IndexGeneratorTask extends AbstractTask { + private static String makeTaskId(String groupId, DateTime start, DateTime end, int partitionNum) + { + return String.format( + "%s_generator_%s_%s_%s", + groupId, + start, + end, + partitionNum + ); + } + @JsonIgnore private final FirehoseFactory firehoseFactory; @@ -75,14 +86,11 @@ public class IndexGeneratorTask extends AbstractTask ) { super( - id != null ? id : String.format( - "%s_generator_%s_%s_%s", - groupId, - interval.getStart(), - interval.getEnd(), - schema.getShardSpec().getPartitionNum() - ), + id != null + ? id + : makeTaskId(groupId, interval.getStart(), interval.getEnd(), schema.getShardSpec().getPartitionNum()), groupId, + makeTaskId(groupId, interval.getStart(), interval.getEnd(), schema.getShardSpec().getPartitionNum()), schema.getDataSource(), Preconditions.checkNotNull(interval, "interval") ); @@ -149,10 +157,10 @@ public class IndexGeneratorTask extends AbstractTask : toolbox.getConfig().getDefaultRowFlushBoundary(); try { - while(firehose.hasMore()) { + while (firehose.hasMore()) { final InputRow inputRow = firehose.nextRow(); - if(shouldIndex(inputRow)) { + if (shouldIndex(inputRow)) { final Sink sink = plumber.getSink(inputRow.getTimestampFromEpoch()); if (sink == null) { throw new NullPointerException( @@ -166,14 +174,15 @@ public class IndexGeneratorTask extends AbstractTask int numRows = sink.add(inputRow); metrics.incrementProcessed(); - if(numRows >= myRowFlushBoundary) { + if (numRows >= myRowFlushBoundary) { plumber.persist(firehose.commit()); } } else { metrics.incrementThrownAway(); } } - } finally { + } + finally { firehose.close(); } @@ -200,18 +209,21 @@ public class IndexGeneratorTask extends AbstractTask /** * Should we index this inputRow? Decision is based on our interval and shardSpec. + * * @param inputRow the row to check + * * @return true or false */ - private boolean shouldIndex(InputRow inputRow) { - if(!getImplicitLockInterval().get().contains(inputRow.getTimestampFromEpoch())) { + private boolean shouldIndex(InputRow inputRow) + { + if (!getImplicitLockInterval().get().contains(inputRow.getTimestampFromEpoch())) { return false; } final Map eventDimensions = Maps.newHashMapWithExpectedSize(inputRow.getDimensions().size()); - for(final String dim : inputRow.getDimensions()) { + for (final String dim : inputRow.getDimensions()) { final List dimValues = inputRow.getDimension(dim); - if(dimValues.size() == 1) { + if (dimValues.size() == 1) { eventDimensions.put(dim, Iterables.getOnlyElement(dimValues)); } } diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/common/task/RealtimeIndexTask.java b/indexing-service/src/main/java/com/metamx/druid/indexing/common/task/RealtimeIndexTask.java index 010c0ebea59..f3298f5cd82 100644 --- a/indexing-service/src/main/java/com/metamx/druid/indexing/common/task/RealtimeIndexTask.java +++ b/indexing-service/src/main/java/com/metamx/druid/indexing/common/task/RealtimeIndexTask.java @@ -64,6 +64,14 @@ public class RealtimeIndexTask extends AbstractTask { private static final EmittingLogger log = new EmittingLogger(RealtimeIndexTask.class); + private static String makeTaskId(String dataSource, int partitionNum, DateTime version) + { + return String.format( + "index_realtime_%s_%d_%s", + dataSource, partitionNum, version + ); + } + @JsonIgnore private final Schema schema; @@ -97,6 +105,7 @@ public class RealtimeIndexTask extends AbstractTask @JsonCreator public RealtimeIndexTask( @JsonProperty("id") String id, + @JsonProperty("availabilityGroup") String availabilityGroup, @JsonProperty("schema") Schema schema, @JsonProperty("firehose") FirehoseFactory firehoseFactory, @JsonProperty("fireDepartmentConfig") FireDepartmentConfig fireDepartmentConfig, @@ -105,14 +114,14 @@ public class RealtimeIndexTask extends AbstractTask ) { super( - id != null ? id : String.format( - "index_realtime_%s_%d_%s", - schema.getDataSource(), schema.getShardSpec().getPartitionNum(), new DateTime() - ), + id != null ? id : makeTaskId(schema.getDataSource(), schema.getShardSpec().getPartitionNum(), new DateTime()), String.format( "index_realtime_%s", schema.getDataSource() ), + availabilityGroup != null + ? availabilityGroup + : makeTaskId(schema.getDataSource(), schema.getShardSpec().getPartitionNum(), new DateTime()), schema.getDataSource(), null ); diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/common/task/Task.java b/indexing-service/src/main/java/com/metamx/druid/indexing/common/task/Task.java index ef7b2f02265..6272669cda5 100644 --- a/indexing-service/src/main/java/com/metamx/druid/indexing/common/task/Task.java +++ b/indexing-service/src/main/java/com/metamx/druid/indexing/common/task/Task.java @@ -71,6 +71,13 @@ public interface Task */ public String getGroupId(); + /** + * Returns availability group ID of this task. Tasks the same availability group cannot be assigned to the same + * worker. If tasks do not have this restriction, a common convention is to set the availability group ID to the + * task ID. + */ + public String getAvailabilityGroup(); + /** * Returns a descriptive label for this task type. Used for metrics emission and logging. */ diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/common/task/VersionConverterTask.java b/indexing-service/src/main/java/com/metamx/druid/indexing/common/task/VersionConverterTask.java index 8fa80a13179..27e46f9ce42 100644 --- a/indexing-service/src/main/java/com/metamx/druid/indexing/common/task/VersionConverterTask.java +++ b/indexing-service/src/main/java/com/metamx/druid/indexing/common/task/VersionConverterTask.java @@ -89,8 +89,7 @@ public class VersionConverterTask extends AbstractTask if (id == null) { if (segment == null) { return create(dataSource, interval); - } - else { + } else { return create(segment); } } @@ -105,7 +104,7 @@ public class VersionConverterTask extends AbstractTask DataSegment segment ) { - super(id, groupId, dataSource, interval); + super(id, groupId, id, dataSource, interval); this.segment = segment; } @@ -206,6 +205,13 @@ public class VersionConverterTask extends AbstractTask segment.getShardSpec().getPartitionNum() ), groupId, + joinId( + groupId, + "sub", + segment.getInterval().getStart(), + segment.getInterval().getEnd(), + segment.getShardSpec().getPartitionNum() + ), segment.getDataSource(), segment.getInterval() ); diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/RemoteTaskRunner.java b/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/RemoteTaskRunner.java index e396158fdc1..0c5ec760c7e 100644 --- a/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/RemoteTaskRunner.java +++ b/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/RemoteTaskRunner.java @@ -70,6 +70,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -113,6 +114,8 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider private final TaskRunnerWorkQueue runningTasks = new TaskRunnerWorkQueue(); // tasks that have not yet run private final TaskRunnerWorkQueue pendingTasks = new TaskRunnerWorkQueue(); + // idempotent task retry + private final Set tasksToRetry = new ConcurrentSkipListSet(); private final ExecutorService runPendingTasksExec = Executors.newSingleThreadExecutor(); @@ -402,14 +405,19 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider /** * Retries a task by inserting it back into the pending queue after a given delay. - * This method will also clean up any status paths that were associated with the task. * * @param taskRunnerWorkItem - the task to retry - * @param workerId - the worker that was previously running this task */ - private void retryTask(final TaskRunnerWorkItem taskRunnerWorkItem, final String workerId) + private void retryTask(final TaskRunnerWorkItem taskRunnerWorkItem) { final String taskId = taskRunnerWorkItem.getTask().getId(); + + if (tasksToRetry.contains(taskId)) { + return; + } + + tasksToRetry.add(taskId); + if (!taskRunnerWorkItem.getRetryPolicy().hasExceededRetryThreshold()) { log.info("Retry scheduled in %s for %s", taskRunnerWorkItem.getRetryPolicy().getRetryDelay(), taskId); scheduledExec.schedule( @@ -419,6 +427,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider public void run() { runningTasks.remove(taskId); + tasksToRetry.remove(taskId); addPendingTask(taskRunnerWorkItem); } }, @@ -469,7 +478,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider log.info("Task %s switched from pending to running", taskId); } else { // Nothing running this task, announce it in ZK for a worker to run it - zkWorker = findWorkerForTask(); + zkWorker = findWorkerForTask(taskRunnerWorkItem.getTask()); if (zkWorker != null) { announceTask(zkWorker.getWorker(), taskRunnerWorkItem); } @@ -518,14 +527,14 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider synchronized (statusLock) { while (!isWorkerRunningTask(theWorker.getHost(), task.getId())) { statusLock.wait(config.getTaskAssignmentTimeoutDuration().getMillis()); - if (timeoutStopwatch.elapsedMillis() >= config.getTaskAssignmentTimeoutDuration().getMillis()) { + if (timeoutStopwatch.elapsed(TimeUnit.MILLISECONDS) >= config.getTaskAssignmentTimeoutDuration().getMillis()) { log.error( "Something went wrong! %s never ran task %s after %s!", theWorker.getHost(), task.getId(), config.getTaskAssignmentTimeoutDuration() ); - retryTask(runningTasks.get(task.getId()), theWorker.getHost()); + retryTask(runningTasks.get(task.getId())); break; } } @@ -546,8 +555,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider final PathChildrenCache statusCache = new PathChildrenCache(cf, workerStatusPath, true); final ZkWorker zkWorker = new ZkWorker( worker, - statusCache, - jsonMapper + statusCache ); // Add status listener to the watcher for status changes @@ -568,7 +576,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider // This can fail if a worker writes a bogus status. Retry if so. if (!taskStatus.getId().equals(taskId)) { - retryTask(runningTasks.get(taskId), worker.getHost()); + retryTask(runningTasks.get(taskId)); return; } @@ -579,6 +587,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider taskId ); + // Synchronizing state with ZK statusLock.notify(); @@ -589,6 +598,8 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider worker.getHost(), taskId ); + } else { + zkWorker.addTask(taskRunnerWorkItem); } if (taskStatus.isComplete()) { @@ -597,6 +608,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider if (result != null) { ((SettableFuture) result).set(taskStatus); } + zkWorker.removeTask(taskRunnerWorkItem); } // Worker is done with this task @@ -606,9 +618,11 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider } } else if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_REMOVED)) { final String taskId = ZKPaths.getNodeFromPath(event.getData().getPath()); - if (runningTasks.containsKey(taskId)) { + TaskRunnerWorkItem taskRunnerWorkItem = runningTasks.get(taskId); + if (taskRunnerWorkItem != null) { log.info("Task %s just disappeared!", taskId); - retryTask(runningTasks.get(taskId), worker.getHost()); + zkWorker.removeTask(taskRunnerWorkItem); + retryTask(taskRunnerWorkItem); } } } @@ -644,9 +658,15 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider ZkWorker zkWorker = zkWorkers.get(worker.getHost()); if (zkWorker != null) { try { - List tasksToRetry = cf.getChildren() - .forPath(JOINER.join(config.getIndexerTaskPath(), worker.getHost())); - log.info("%s has %d pending tasks to retry", worker.getHost(), tasksToRetry.size()); + Set tasksToRetry = Sets.newHashSet( + cf.getChildren() + .forPath(JOINER.join(config.getIndexerTaskPath(), worker.getHost())) + ); + tasksToRetry.addAll( + cf.getChildren() + .forPath(JOINER.join(config.getIndexerStatusPath(), worker.getHost())) + ); + log.info("%s has %d tasks to retry", worker.getHost(), tasksToRetry.size()); for (String taskId : tasksToRetry) { TaskRunnerWorkItem taskRunnerWorkItem = runningTasks.get(taskId); @@ -655,13 +675,13 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider if (cf.checkExists().forPath(taskPath) != null) { cf.delete().guaranteed().forPath(taskPath); } - retryTask(taskRunnerWorkItem, worker.getHost()); + retryTask(taskRunnerWorkItem); } else { log.warn("RemoteTaskRunner has no knowledge of task %s", taskId); } } - zkWorker.getStatusCache().close(); + zkWorker.close(); } catch (Exception e) { throw Throwables.propagate(e); @@ -672,7 +692,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider } } - private ZkWorker findWorkerForTask() + private ZkWorker findWorkerForTask(final Task task) { try { final MinMaxPriorityQueue workerQueue = MinMaxPriorityQueue.orderedBy( @@ -694,7 +714,9 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider return (!input.isAtCapacity() && input.getWorker() .getVersion() - .compareTo(workerSetupData.get().getMinVersion()) >= 0); + .compareTo(workerSetupData.get().getMinVersion()) >= 0 && + !input.getAvailabilityGroups().contains(task.getAvailabilityGroup()) + ); } } ) diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/ZkWorker.java b/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/ZkWorker.java index a27fc6b95b5..41cda473986 100644 --- a/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/ZkWorker.java +++ b/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/ZkWorker.java @@ -20,19 +20,11 @@ package com.metamx.druid.indexing.coordinator; import com.fasterxml.jackson.annotation.JsonProperty; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Function; -import com.google.common.base.Throwables; -import com.google.common.collect.Lists; import com.google.common.collect.Sets; -import com.metamx.druid.indexing.common.TaskStatus; import com.metamx.druid.indexing.worker.Worker; -import org.apache.curator.framework.recipes.cache.ChildData; import org.apache.curator.framework.recipes.cache.PathChildrenCache; - import org.joda.time.DateTime; -import javax.annotation.Nullable; import java.io.Closeable; import java.io.IOException; import java.util.Set; @@ -44,27 +36,17 @@ public class ZkWorker implements Closeable { private final Worker worker; private final PathChildrenCache statusCache; - private final Function cacheConverter; + private final Set runningTasks; + private final Set availabilityGroups; private volatile DateTime lastCompletedTaskTime = new DateTime(); - public ZkWorker(Worker worker, PathChildrenCache statusCache, final ObjectMapper jsonMapper) + public ZkWorker(Worker worker, PathChildrenCache statusCache) { this.worker = worker; this.statusCache = statusCache; - this.cacheConverter = new Function() - { - @Override - public String apply(@Nullable ChildData input) - { - try { - return jsonMapper.readValue(input.getData(), TaskStatus.class).getId(); - } - catch (Exception e) { - throw Throwables.propagate(e); - } - } - }; + this.runningTasks = Sets.newHashSet(); + this.availabilityGroups = Sets.newHashSet(); } @JsonProperty @@ -76,17 +58,13 @@ public class ZkWorker implements Closeable @JsonProperty public Set getRunningTasks() { - return Sets.newHashSet( - Lists.transform( - statusCache.getCurrentData(), - cacheConverter - ) - ); + return runningTasks; } - public PathChildrenCache getStatusCache() + @JsonProperty + public Set getAvailabilityGroups() { - return statusCache; + return availabilityGroups; } @JsonProperty @@ -98,7 +76,7 @@ public class ZkWorker implements Closeable @JsonProperty public boolean isAtCapacity() { - return statusCache.getCurrentData().size() >= worker.getCapacity(); + return runningTasks.size() >= worker.getCapacity(); } public void setLastCompletedTaskTime(DateTime completedTaskTime) @@ -106,6 +84,18 @@ public class ZkWorker implements Closeable lastCompletedTaskTime = completedTaskTime; } + public void addTask(TaskRunnerWorkItem item) + { + runningTasks.add(item.getTask().getId()); + availabilityGroups.add(item.getTask().getAvailabilityGroup()); + } + + public void removeTask(TaskRunnerWorkItem item) + { + runningTasks.remove(item.getTask().getId()); + availabilityGroups.remove(item.getTask().getAvailabilityGroup()); + } + @Override public void close() throws IOException { @@ -117,6 +107,8 @@ public class ZkWorker implements Closeable { return "ZkWorker{" + "worker=" + worker + + ", runningTasks=" + runningTasks + + ", availabilityGroups=" + availabilityGroups + ", lastCompletedTaskTime=" + lastCompletedTaskTime + '}'; } diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/http/IndexerCoordinatorNode.java b/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/http/IndexerCoordinatorNode.java index bfcd09fdaa1..69801809c1d 100644 --- a/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/http/IndexerCoordinatorNode.java +++ b/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/http/IndexerCoordinatorNode.java @@ -345,6 +345,7 @@ public class IndexerCoordinatorNode extends QueryableNode locks1 = toolbox.getTaskActionClient().submit(new LockListAction()); + + // (Confirm lock sanity) + Assert.assertEquals("lock1 interval", interval1, lock1.getInterval()); + Assert.assertEquals("locks1", ImmutableList.of(lock1), locks1); + + // Acquire lock for second interval + final TaskLock lock2 = toolbox.getTaskActionClient().submit(new LockAcquireAction(interval2)); + final List locks2 = toolbox.getTaskActionClient().submit(new LockListAction()); + + // (Confirm lock sanity) + Assert.assertEquals("lock2 interval", interval2, lock2.getInterval()); + Assert.assertEquals("locks2", ImmutableList.of(lock1, lock2), locks2); + + // Push first segment + toolbox.getTaskActionClient() + .submit( + new SegmentInsertAction( + ImmutableSet.of( + DataSegment.builder() + .dataSource("foo") + .interval(interval1) + .version(lock1.getVersion()) + .build() + ) + ) + ); + + // Release first lock + toolbox.getTaskActionClient().submit(new LockReleaseAction(interval1)); + final List locks3 = toolbox.getTaskActionClient().submit(new LockListAction()); + + // (Confirm lock sanity) + Assert.assertEquals("locks3", ImmutableList.of(lock2), locks3); + + // Push second segment + toolbox.getTaskActionClient() + .submit( + new SegmentInsertAction( + ImmutableSet.of( + DataSegment.builder() + .dataSource("foo") + .interval(interval2) + .version(lock2.getVersion()) + .build() + ) + ) + ); + + // Release second lock + toolbox.getTaskActionClient().submit(new LockReleaseAction(interval2)); + final List locks4 = toolbox.getTaskActionClient().submit(new LockListAction()); + + // (Confirm lock sanity) + Assert.assertEquals("locks4", ImmutableList.of(), locks4); + + // Exit + return TaskStatus.success(getId()); + } +} diff --git a/indexing-service/src/test/java/com/metamx/druid/indexing/coordinator/RemoteTaskRunnerTest.java b/indexing-service/src/test/java/com/metamx/druid/indexing/coordinator/RemoteTaskRunnerTest.java index bdbc99519cc..9b0d879e28d 100644 --- a/indexing-service/src/test/java/com/metamx/druid/indexing/coordinator/RemoteTaskRunnerTest.java +++ b/indexing-service/src/test/java/com/metamx/druid/indexing/coordinator/RemoteTaskRunnerTest.java @@ -2,6 +2,7 @@ package com.metamx.druid.indexing.coordinator; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.jsontype.NamedType; +import com.google.common.base.Stopwatch; import com.google.common.base.Throwables; import com.google.common.collect.Lists; import com.google.common.util.concurrent.FutureCallback; @@ -41,8 +42,10 @@ import org.junit.Before; import org.junit.Test; import java.io.File; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import static junit.framework.Assert.fail; @@ -245,6 +248,34 @@ public class RemoteTaskRunnerTest Assert.assertTrue("TaskCallback was not called!", callbackCalled.booleanValue()); } + + @Test + public void testRunSameAvailabilityGroup() throws Exception + { + TestRealtimeTask theTask = new TestRealtimeTask("rt1", "rt1", "foo", TaskStatus.running("rt1")); + remoteTaskRunner.run(theTask); + remoteTaskRunner.run( + new TestRealtimeTask("rt2", "rt1", "foo", TaskStatus.running("rt2")) + ); + remoteTaskRunner.run( + new TestRealtimeTask("rt3", "rt2", "foo", TaskStatus.running("rt3")) + ); + + Stopwatch stopwatch = new Stopwatch(); + stopwatch.start(); + while (remoteTaskRunner.getRunningTasks().isEmpty()) { + Thread.sleep(100); + if (stopwatch.elapsed(TimeUnit.MILLISECONDS) > 1000) { + throw new ISE("Cannot find running task"); + } + } + + Assert.assertTrue(remoteTaskRunner.getRunningTasks().size() == 2); + Assert.assertTrue(remoteTaskRunner.getPendingTasks().size() == 1); + Assert.assertTrue(remoteTaskRunner.getPendingTasks().iterator().next().getTask().getId().equals("rt2")); + } + + private void makeTaskMonitor() throws Exception { WorkerCuratorCoordinator workerCuratorCoordinator = new WorkerCuratorCoordinator( @@ -323,6 +354,7 @@ public class RemoteTaskRunnerTest Executors.newSingleThreadExecutor() ); jsonMapper.registerSubtypes(new NamedType(TestTask.class, "test")); + jsonMapper.registerSubtypes(new NamedType(TestRealtimeTask.class, "test_realtime")); workerTaskMonitor.start(); } diff --git a/indexing-service/src/test/java/com/metamx/druid/indexing/coordinator/TaskLifecycleTest.java b/indexing-service/src/test/java/com/metamx/druid/indexing/coordinator/TaskLifecycleTest.java index 0a4795eb212..ef3c6412c6f 100644 --- a/indexing-service/src/test/java/com/metamx/druid/indexing/coordinator/TaskLifecycleTest.java +++ b/indexing-service/src/test/java/com/metamx/druid/indexing/coordinator/TaskLifecycleTest.java @@ -273,90 +273,6 @@ public class TaskLifecycleTest @Test public void testRealtimeishTask() throws Exception { - class RealtimeishTask extends AbstractTask - { - RealtimeishTask() - { - super("rt1", "rt", "foo", null); - } - - @Override - public String getType() - { - return "realtime_test"; - } - - @Override - public TaskStatus run(TaskToolbox toolbox) throws Exception - { - final Interval interval1 = new Interval("2010-01-01T00/PT1H"); - final Interval interval2 = new Interval("2010-01-01T01/PT1H"); - - // Sort of similar to what realtime tasks do: - - // Acquire lock for first interval - final TaskLock lock1 = toolbox.getTaskActionClient().submit(new LockAcquireAction(interval1)); - final List locks1 = toolbox.getTaskActionClient().submit(new LockListAction()); - - // (Confirm lock sanity) - Assert.assertEquals("lock1 interval", interval1, lock1.getInterval()); - Assert.assertEquals("locks1", ImmutableList.of(lock1), locks1); - - // Acquire lock for second interval - final TaskLock lock2 = toolbox.getTaskActionClient().submit(new LockAcquireAction(interval2)); - final List locks2 = toolbox.getTaskActionClient().submit(new LockListAction()); - - // (Confirm lock sanity) - Assert.assertEquals("lock2 interval", interval2, lock2.getInterval()); - Assert.assertEquals("locks2", ImmutableList.of(lock1, lock2), locks2); - - // Push first segment - toolbox.getTaskActionClient() - .submit( - new SegmentInsertAction( - ImmutableSet.of( - DataSegment.builder() - .dataSource("foo") - .interval(interval1) - .version(lock1.getVersion()) - .build() - ) - ) - ); - - // Release first lock - toolbox.getTaskActionClient().submit(new LockReleaseAction(interval1)); - final List locks3 = toolbox.getTaskActionClient().submit(new LockListAction()); - - // (Confirm lock sanity) - Assert.assertEquals("locks3", ImmutableList.of(lock2), locks3); - - // Push second segment - toolbox.getTaskActionClient() - .submit( - new SegmentInsertAction( - ImmutableSet.of( - DataSegment.builder() - .dataSource("foo") - .interval(interval2) - .version(lock2.getVersion()) - .build() - ) - ) - ); - - // Release second lock - toolbox.getTaskActionClient().submit(new LockReleaseAction(interval2)); - final List locks4 = toolbox.getTaskActionClient().submit(new LockListAction()); - - // (Confirm lock sanity) - Assert.assertEquals("locks4", ImmutableList.of(), locks4); - - // Exit - return TaskStatus.success(getId()); - } - } - final Task rtishTask = new RealtimeishTask(); final TaskStatus status = runTask(rtishTask); @@ -368,7 +284,7 @@ public class TaskLifecycleTest @Test public void testSimple() throws Exception { - final Task task = new AbstractTask("id1", "id1", "ds", new Interval("2012-01-01/P1D")) + final Task task = new AbstractTask("id1", "id1", "id1", "ds", new Interval("2012-01-01/P1D")) { @Override public String getType() @@ -405,7 +321,7 @@ public class TaskLifecycleTest @Test public void testBadInterval() throws Exception { - final Task task = new AbstractTask("id1", "id1", "ds", new Interval("2012-01-01/P1D")) + final Task task = new AbstractTask("id1", "id1", "id1", "ds", new Interval("2012-01-01/P1D")) { @Override public String getType() @@ -439,7 +355,7 @@ public class TaskLifecycleTest @Test public void testBadVersion() throws Exception { - final Task task = new AbstractTask("id1", "id1", "ds", new Interval("2012-01-01/P1D")) + final Task task = new AbstractTask("id1", "id1", "id1", "ds", new Interval("2012-01-01/P1D")) { @Override public String getType() diff --git a/indexing-service/src/test/java/com/metamx/druid/indexing/coordinator/TaskQueueTest.java b/indexing-service/src/test/java/com/metamx/druid/indexing/coordinator/TaskQueueTest.java index d3bc9d95541..4e023b736dd 100644 --- a/indexing-service/src/test/java/com/metamx/druid/indexing/coordinator/TaskQueueTest.java +++ b/indexing-service/src/test/java/com/metamx/druid/indexing/coordinator/TaskQueueTest.java @@ -346,7 +346,7 @@ public class TaskQueueTest private static Task newTask(final String id, final String groupId, final String dataSource, final Interval interval) { - return new AbstractTask(id, groupId, dataSource, interval) + return new AbstractTask(id, groupId, id, dataSource, interval) { @Override public TaskStatus run(TaskToolbox toolbox) throws Exception @@ -370,7 +370,7 @@ public class TaskQueueTest final List nextTasks ) { - return new AbstractTask(id, groupId, dataSource, interval) + return new AbstractTask(id, groupId, id, dataSource, interval) { @Override public String getType() diff --git a/indexing-service/src/test/java/com/metamx/druid/indexing/coordinator/TestRealtimeTask.java b/indexing-service/src/test/java/com/metamx/druid/indexing/coordinator/TestRealtimeTask.java new file mode 100644 index 00000000000..515b75e3e1f --- /dev/null +++ b/indexing-service/src/test/java/com/metamx/druid/indexing/coordinator/TestRealtimeTask.java @@ -0,0 +1,78 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package com.metamx.druid.indexing.coordinator; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; +import com.metamx.druid.QueryGranularity; +import com.metamx.druid.aggregation.AggregatorFactory; +import com.metamx.druid.indexing.common.TaskStatus; +import com.metamx.druid.indexing.common.TaskToolbox; +import com.metamx.druid.indexing.common.task.RealtimeIndexTask; +import com.metamx.druid.realtime.Schema; +import com.metamx.druid.shard.NoneShardSpec; + +/** + */ +@JsonTypeName("test_realtime") +public class TestRealtimeTask extends RealtimeIndexTask +{ + private final TaskStatus status; + + @JsonCreator + public TestRealtimeTask( + @JsonProperty("id") String id, + @JsonProperty("availabilityGroup") String availGroup, + @JsonProperty("dataSource") String dataSource, + @JsonProperty("taskStatus") TaskStatus status + ) + { + super( + id, + availGroup, + new Schema(dataSource, null, new AggregatorFactory[]{}, QueryGranularity.NONE, new NoneShardSpec()), + null, + null, + null, + null + ); + this.status = status; + } + + @Override + @JsonProperty + public String getType() + { + return "test_realtime"; + } + + @JsonProperty + public TaskStatus getStatus() + { + return status; + } + + @Override + public TaskStatus run(TaskToolbox toolbox) throws Exception + { + return status; + } +} diff --git a/indexing-service/src/test/java/com/metamx/druid/indexing/coordinator/scaling/SimpleResourceManagementStrategyTest.java b/indexing-service/src/test/java/com/metamx/druid/indexing/coordinator/scaling/SimpleResourceManagementStrategyTest.java index c44d555f798..0a4b56fffcd 100644 --- a/indexing-service/src/test/java/com/metamx/druid/indexing/coordinator/scaling/SimpleResourceManagementStrategyTest.java +++ b/indexing-service/src/test/java/com/metamx/druid/indexing/coordinator/scaling/SimpleResourceManagementStrategyTest.java @@ -334,7 +334,7 @@ public class SimpleResourceManagementStrategyTest Task testTask ) { - super(new Worker("host", "ip", 3, "version"), null, null); + super(new Worker("host", "ip", 3, "version"), null); this.testTask = testTask; } diff --git a/pom.xml b/pom.xml index 364ed88e001..aa2aaeff5de 100644 --- a/pom.xml +++ b/pom.xml @@ -24,7 +24,7 @@ com.metamx druid pom - 0.4.33-SNAPSHOT + 0.5.0-SNAPSHOT druid druid diff --git a/server/src/main/java/com/metamx/druid/http/InfoResource.java b/server/src/main/java/com/metamx/druid/http/InfoResource.java index 00474205f40..3b14436c7d1 100644 --- a/server/src/main/java/com/metamx/druid/http/InfoResource.java +++ b/server/src/main/java/com/metamx/druid/http/InfoResource.java @@ -346,11 +346,16 @@ public class InfoResource @Path("/rules/{dataSourceName}") @Produces("application/json") public Response getDatasourceRules( - @PathParam("dataSourceName") final String dataSourceName + @PathParam("dataSourceName") final String dataSourceName, + @QueryParam("full") final String full + ) { - return Response.status(Response.Status.OK) - .entity(databaseRuleManager.getRules(dataSourceName)) + if (full != null) { + return Response.ok(databaseRuleManager.getRulesWithDefault(dataSourceName)) + .build(); + } + return Response.ok(databaseRuleManager.getRules(dataSourceName)) .build(); }