introduce availability groups

This commit is contained in:
fjy 2013-06-04 17:12:19 -07:00
parent 42cc87a294
commit 06931ee0f5
24 changed files with 458 additions and 196 deletions

View File

@ -108,7 +108,7 @@ public class IndexingServiceClient
throw new ISE("Cannot find instance of indexingService");
}
return String.format("http://%s:%s/mmx/merger/v1", instance.getAddress(), instance.getPort());
return String.format("http://%s:%s/mmx/indexer/v1", instance.getAddress(), instance.getPort());
}
catch (Exception e) {
throw Throwables.propagate(e);

View File

@ -27,15 +27,13 @@ import com.metamx.druid.kv.GenericIndexed;
*/
public class SpatialIndexColumnPartSupplier implements Supplier<SpatialIndex>
{
private static final ImmutableRTree EMPTY_SET = new ImmutableRTree();
private final ImmutableRTree indexedTree;
public SpatialIndexColumnPartSupplier(
ImmutableRTree indexedTree
)
{
this.indexedTree = (indexedTree == null) ? EMPTY_SET : indexedTree;
this.indexedTree = indexedTree;
}
@Override

View File

@ -692,25 +692,29 @@ public class IndexIO
Map<String, Column> columns = Maps.newHashMap();
for (String dimension : index.getAvailableDimensions()) {
ColumnBuilder builder = new ColumnBuilder()
.setType(ValueType.STRING)
.setHasMultipleValues(true)
.setDictionaryEncodedColumn(
new DictionaryEncodedColumnSupplier(
index.getDimValueLookup(dimension), null, (index.getDimColumn(dimension))
)
)
.setBitmapIndex(
new BitmapIndexColumnPartSupplier(
index.getInvertedIndexes().get(dimension), index.getDimValueLookup(dimension)
)
);
if (index.getSpatialIndexes().get(dimension) != null) {
builder.setSpatialIndex(
new SpatialIndexColumnPartSupplier(
index.getSpatialIndexes().get(dimension)
)
);
}
columns.put(
dimension.toLowerCase(),
new ColumnBuilder()
.setType(ValueType.STRING)
.setHasMultipleValues(true)
.setDictionaryEncodedColumn(
new DictionaryEncodedColumnSupplier(
index.getDimValueLookup(dimension), null, (index.getDimColumn(dimension))
)
)
.setBitmapIndex(
new BitmapIndexColumnPartSupplier(
index.getInvertedIndexes().get(dimension), index.getDimValueLookup(dimension)
)
).setSpatialIndex(
new SpatialIndexColumnPartSupplier(
index.getSpatialIndexes().get(dimension)
)
).build()
builder.build()
);
}

View File

@ -42,6 +42,9 @@ public abstract class AbstractTask implements Task
@JsonIgnore
private final String groupId;
@JsonIgnore
private final String availabilityGroup;
@JsonIgnore
private final String dataSource;
@ -50,13 +53,14 @@ public abstract class AbstractTask implements Task
protected AbstractTask(String id, String dataSource, Interval interval)
{
this(id, id, dataSource, interval);
this(id, id, id, dataSource, interval);
}
protected AbstractTask(String id, String groupId, String dataSource, Interval interval)
protected AbstractTask(String id, String groupId, String availabilityGroup, String dataSource, Interval interval)
{
this.id = Preconditions.checkNotNull(id, "id");
this.groupId = Preconditions.checkNotNull(groupId, "groupId");
this.availabilityGroup = Preconditions.checkNotNull(availabilityGroup, "availabilityGroup");
this.dataSource = Preconditions.checkNotNull(dataSource, "dataSource");
this.interval = Optional.fromNullable(interval);
}
@ -75,6 +79,13 @@ public abstract class AbstractTask implements Task
return groupId;
}
@JsonProperty
@Override
public String getAvailabilityGroup()
{
return availabilityGroup;
}
@Override
public String getNodeType()
{

View File

@ -41,6 +41,7 @@ import com.metamx.druid.realtime.Schema;
import com.metamx.druid.shard.NoneShardSpec;
import com.metamx.druid.shard.ShardSpec;
import com.metamx.druid.shard.SingleDimensionShardSpec;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import java.util.List;
@ -49,6 +50,16 @@ import java.util.Set;
public class IndexDeterminePartitionsTask extends AbstractTask
{
private static String makeTaskId(String groupId, DateTime start, DateTime end)
{
return String.format(
"%s_partitions_%s_%s",
groupId,
start,
end
);
}
@JsonIgnore
private final FirehoseFactory firehoseFactory;
@ -75,13 +86,9 @@ public class IndexDeterminePartitionsTask extends AbstractTask
)
{
super(
id != null ? id : String.format(
"%s_partitions_%s_%s",
groupId,
interval.getStart(),
interval.getEnd()
),
id != null ? id : makeTaskId(groupId, interval.getStart(), interval.getEnd()),
groupId,
makeTaskId(groupId, interval.getStart(), interval.getEnd()),
schema.getDataSource(),
Preconditions.checkNotNull(interval, "interval")
);

View File

@ -53,6 +53,17 @@ import java.util.concurrent.CopyOnWriteArrayList;
public class IndexGeneratorTask extends AbstractTask
{
private static String makeTaskId(String groupId, DateTime start, DateTime end, int partitionNum)
{
return String.format(
"%s_generator_%s_%s_%s",
groupId,
start,
end,
partitionNum
);
}
@JsonIgnore
private final FirehoseFactory firehoseFactory;
@ -75,14 +86,11 @@ public class IndexGeneratorTask extends AbstractTask
)
{
super(
id != null ? id : String.format(
"%s_generator_%s_%s_%s",
groupId,
interval.getStart(),
interval.getEnd(),
schema.getShardSpec().getPartitionNum()
),
id != null
? id
: makeTaskId(groupId, interval.getStart(), interval.getEnd(), schema.getShardSpec().getPartitionNum()),
groupId,
makeTaskId(groupId, interval.getStart(), interval.getEnd(), schema.getShardSpec().getPartitionNum()),
schema.getDataSource(),
Preconditions.checkNotNull(interval, "interval")
);
@ -149,10 +157,10 @@ public class IndexGeneratorTask extends AbstractTask
: toolbox.getConfig().getDefaultRowFlushBoundary();
try {
while(firehose.hasMore()) {
while (firehose.hasMore()) {
final InputRow inputRow = firehose.nextRow();
if(shouldIndex(inputRow)) {
if (shouldIndex(inputRow)) {
final Sink sink = plumber.getSink(inputRow.getTimestampFromEpoch());
if (sink == null) {
throw new NullPointerException(
@ -166,14 +174,15 @@ public class IndexGeneratorTask extends AbstractTask
int numRows = sink.add(inputRow);
metrics.incrementProcessed();
if(numRows >= myRowFlushBoundary) {
if (numRows >= myRowFlushBoundary) {
plumber.persist(firehose.commit());
}
} else {
metrics.incrementThrownAway();
}
}
} finally {
}
finally {
firehose.close();
}
@ -200,18 +209,21 @@ public class IndexGeneratorTask extends AbstractTask
/**
* Should we index this inputRow? Decision is based on our interval and shardSpec.
*
* @param inputRow the row to check
*
* @return true or false
*/
private boolean shouldIndex(InputRow inputRow) {
if(!getImplicitLockInterval().get().contains(inputRow.getTimestampFromEpoch())) {
private boolean shouldIndex(InputRow inputRow)
{
if (!getImplicitLockInterval().get().contains(inputRow.getTimestampFromEpoch())) {
return false;
}
final Map<String, String> eventDimensions = Maps.newHashMapWithExpectedSize(inputRow.getDimensions().size());
for(final String dim : inputRow.getDimensions()) {
for (final String dim : inputRow.getDimensions()) {
final List<String> dimValues = inputRow.getDimension(dim);
if(dimValues.size() == 1) {
if (dimValues.size() == 1) {
eventDimensions.put(dim, Iterables.getOnlyElement(dimValues));
}
}

View File

@ -64,6 +64,14 @@ public class RealtimeIndexTask extends AbstractTask
{
private static final EmittingLogger log = new EmittingLogger(RealtimeIndexTask.class);
private static String makeTaskId(String dataSource, int partitionNum, DateTime version)
{
return String.format(
"index_realtime_%s_%d_%s",
dataSource, partitionNum, version
);
}
@JsonIgnore
private final Schema schema;
@ -97,6 +105,7 @@ public class RealtimeIndexTask extends AbstractTask
@JsonCreator
public RealtimeIndexTask(
@JsonProperty("id") String id,
@JsonProperty("availabilityGroup") String availabilityGroup,
@JsonProperty("schema") Schema schema,
@JsonProperty("firehose") FirehoseFactory firehoseFactory,
@JsonProperty("fireDepartmentConfig") FireDepartmentConfig fireDepartmentConfig,
@ -105,14 +114,14 @@ public class RealtimeIndexTask extends AbstractTask
)
{
super(
id != null ? id : String.format(
"index_realtime_%s_%d_%s",
schema.getDataSource(), schema.getShardSpec().getPartitionNum(), new DateTime()
),
id != null ? id : makeTaskId(schema.getDataSource(), schema.getShardSpec().getPartitionNum(), new DateTime()),
String.format(
"index_realtime_%s",
schema.getDataSource()
),
availabilityGroup != null
? availabilityGroup
: makeTaskId(schema.getDataSource(), schema.getShardSpec().getPartitionNum(), new DateTime()),
schema.getDataSource(),
null
);

View File

@ -71,6 +71,13 @@ public interface Task
*/
public String getGroupId();
/**
* Returns availability group ID of this task. Tasks the same availability group cannot be assigned to the same
* worker. If tasks do not have this restriction, a common convention is to set the availability group ID to the
* task ID.
*/
public String getAvailabilityGroup();
/**
* Returns a descriptive label for this task type. Used for metrics emission and logging.
*/

View File

@ -89,8 +89,7 @@ public class VersionConverterTask extends AbstractTask
if (id == null) {
if (segment == null) {
return create(dataSource, interval);
}
else {
} else {
return create(segment);
}
}
@ -105,7 +104,7 @@ public class VersionConverterTask extends AbstractTask
DataSegment segment
)
{
super(id, groupId, dataSource, interval);
super(id, groupId, id, dataSource, interval);
this.segment = segment;
}
@ -206,6 +205,13 @@ public class VersionConverterTask extends AbstractTask
segment.getShardSpec().getPartitionNum()
),
groupId,
joinId(
groupId,
"sub",
segment.getInterval().getStart(),
segment.getInterval().getEnd(),
segment.getShardSpec().getPartitionNum()
),
segment.getDataSource(),
segment.getInterval()
);

View File

@ -70,6 +70,7 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@ -113,6 +114,8 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider
private final TaskRunnerWorkQueue runningTasks = new TaskRunnerWorkQueue();
// tasks that have not yet run
private final TaskRunnerWorkQueue pendingTasks = new TaskRunnerWorkQueue();
// idempotent task retry
private final Set<String> tasksToRetry = new ConcurrentSkipListSet<String>();
private final ExecutorService runPendingTasksExec = Executors.newSingleThreadExecutor();
@ -402,14 +405,19 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider
/**
* Retries a task by inserting it back into the pending queue after a given delay.
* This method will also clean up any status paths that were associated with the task.
*
* @param taskRunnerWorkItem - the task to retry
* @param workerId - the worker that was previously running this task
*/
private void retryTask(final TaskRunnerWorkItem taskRunnerWorkItem, final String workerId)
private void retryTask(final TaskRunnerWorkItem taskRunnerWorkItem)
{
final String taskId = taskRunnerWorkItem.getTask().getId();
if (tasksToRetry.contains(taskId)) {
return;
}
tasksToRetry.add(taskId);
if (!taskRunnerWorkItem.getRetryPolicy().hasExceededRetryThreshold()) {
log.info("Retry scheduled in %s for %s", taskRunnerWorkItem.getRetryPolicy().getRetryDelay(), taskId);
scheduledExec.schedule(
@ -419,6 +427,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider
public void run()
{
runningTasks.remove(taskId);
tasksToRetry.remove(taskId);
addPendingTask(taskRunnerWorkItem);
}
},
@ -469,7 +478,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider
log.info("Task %s switched from pending to running", taskId);
} else {
// Nothing running this task, announce it in ZK for a worker to run it
zkWorker = findWorkerForTask();
zkWorker = findWorkerForTask(taskRunnerWorkItem.getTask());
if (zkWorker != null) {
announceTask(zkWorker.getWorker(), taskRunnerWorkItem);
}
@ -518,14 +527,14 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider
synchronized (statusLock) {
while (!isWorkerRunningTask(theWorker.getHost(), task.getId())) {
statusLock.wait(config.getTaskAssignmentTimeoutDuration().getMillis());
if (timeoutStopwatch.elapsedMillis() >= config.getTaskAssignmentTimeoutDuration().getMillis()) {
if (timeoutStopwatch.elapsed(TimeUnit.MILLISECONDS) >= config.getTaskAssignmentTimeoutDuration().getMillis()) {
log.error(
"Something went wrong! %s never ran task %s after %s!",
theWorker.getHost(),
task.getId(),
config.getTaskAssignmentTimeoutDuration()
);
retryTask(runningTasks.get(task.getId()), theWorker.getHost());
retryTask(runningTasks.get(task.getId()));
break;
}
}
@ -546,8 +555,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider
final PathChildrenCache statusCache = new PathChildrenCache(cf, workerStatusPath, true);
final ZkWorker zkWorker = new ZkWorker(
worker,
statusCache,
jsonMapper
statusCache
);
// Add status listener to the watcher for status changes
@ -568,7 +576,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider
// This can fail if a worker writes a bogus status. Retry if so.
if (!taskStatus.getId().equals(taskId)) {
retryTask(runningTasks.get(taskId), worker.getHost());
retryTask(runningTasks.get(taskId));
return;
}
@ -579,6 +587,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider
taskId
);
// Synchronizing state with ZK
statusLock.notify();
@ -589,6 +598,8 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider
worker.getHost(),
taskId
);
} else {
zkWorker.addTask(taskRunnerWorkItem);
}
if (taskStatus.isComplete()) {
@ -597,6 +608,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider
if (result != null) {
((SettableFuture<TaskStatus>) result).set(taskStatus);
}
zkWorker.removeTask(taskRunnerWorkItem);
}
// Worker is done with this task
@ -606,9 +618,11 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider
}
} else if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_REMOVED)) {
final String taskId = ZKPaths.getNodeFromPath(event.getData().getPath());
if (runningTasks.containsKey(taskId)) {
TaskRunnerWorkItem taskRunnerWorkItem = runningTasks.get(taskId);
if (taskRunnerWorkItem != null) {
log.info("Task %s just disappeared!", taskId);
retryTask(runningTasks.get(taskId), worker.getHost());
zkWorker.removeTask(taskRunnerWorkItem);
retryTask(taskRunnerWorkItem);
}
}
}
@ -644,9 +658,15 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider
ZkWorker zkWorker = zkWorkers.get(worker.getHost());
if (zkWorker != null) {
try {
List<String> tasksToRetry = cf.getChildren()
.forPath(JOINER.join(config.getIndexerTaskPath(), worker.getHost()));
log.info("%s has %d pending tasks to retry", worker.getHost(), tasksToRetry.size());
Set<String> tasksToRetry = Sets.newHashSet(
cf.getChildren()
.forPath(JOINER.join(config.getIndexerTaskPath(), worker.getHost()))
);
tasksToRetry.addAll(
cf.getChildren()
.forPath(JOINER.join(config.getIndexerStatusPath(), worker.getHost()))
);
log.info("%s has %d tasks to retry", worker.getHost(), tasksToRetry.size());
for (String taskId : tasksToRetry) {
TaskRunnerWorkItem taskRunnerWorkItem = runningTasks.get(taskId);
@ -655,13 +675,13 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider
if (cf.checkExists().forPath(taskPath) != null) {
cf.delete().guaranteed().forPath(taskPath);
}
retryTask(taskRunnerWorkItem, worker.getHost());
retryTask(taskRunnerWorkItem);
} else {
log.warn("RemoteTaskRunner has no knowledge of task %s", taskId);
}
}
zkWorker.getStatusCache().close();
zkWorker.close();
}
catch (Exception e) {
throw Throwables.propagate(e);
@ -672,7 +692,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider
}
}
private ZkWorker findWorkerForTask()
private ZkWorker findWorkerForTask(final Task task)
{
try {
final MinMaxPriorityQueue<ZkWorker> workerQueue = MinMaxPriorityQueue.<ZkWorker>orderedBy(
@ -694,7 +714,9 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider
return (!input.isAtCapacity() &&
input.getWorker()
.getVersion()
.compareTo(workerSetupData.get().getMinVersion()) >= 0);
.compareTo(workerSetupData.get().getMinVersion()) >= 0 &&
!input.getAvailabilityGroups().contains(task.getAvailabilityGroup())
);
}
}
)

View File

@ -20,19 +20,11 @@
package com.metamx.druid.indexing.coordinator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.metamx.druid.indexing.common.TaskStatus;
import com.metamx.druid.indexing.worker.Worker;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.joda.time.DateTime;
import javax.annotation.Nullable;
import java.io.Closeable;
import java.io.IOException;
import java.util.Set;
@ -44,27 +36,17 @@ public class ZkWorker implements Closeable
{
private final Worker worker;
private final PathChildrenCache statusCache;
private final Function<ChildData, String> cacheConverter;
private final Set<String> runningTasks;
private final Set<String> availabilityGroups;
private volatile DateTime lastCompletedTaskTime = new DateTime();
public ZkWorker(Worker worker, PathChildrenCache statusCache, final ObjectMapper jsonMapper)
public ZkWorker(Worker worker, PathChildrenCache statusCache)
{
this.worker = worker;
this.statusCache = statusCache;
this.cacheConverter = new Function<ChildData, String>()
{
@Override
public String apply(@Nullable ChildData input)
{
try {
return jsonMapper.readValue(input.getData(), TaskStatus.class).getId();
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
};
this.runningTasks = Sets.newHashSet();
this.availabilityGroups = Sets.newHashSet();
}
@JsonProperty
@ -76,17 +58,13 @@ public class ZkWorker implements Closeable
@JsonProperty
public Set<String> getRunningTasks()
{
return Sets.newHashSet(
Lists.transform(
statusCache.getCurrentData(),
cacheConverter
)
);
return runningTasks;
}
public PathChildrenCache getStatusCache()
@JsonProperty
public Set<String> getAvailabilityGroups()
{
return statusCache;
return availabilityGroups;
}
@JsonProperty
@ -98,7 +76,7 @@ public class ZkWorker implements Closeable
@JsonProperty
public boolean isAtCapacity()
{
return statusCache.getCurrentData().size() >= worker.getCapacity();
return runningTasks.size() >= worker.getCapacity();
}
public void setLastCompletedTaskTime(DateTime completedTaskTime)
@ -106,6 +84,18 @@ public class ZkWorker implements Closeable
lastCompletedTaskTime = completedTaskTime;
}
public void addTask(TaskRunnerWorkItem item)
{
runningTasks.add(item.getTask().getId());
availabilityGroups.add(item.getTask().getAvailabilityGroup());
}
public void removeTask(TaskRunnerWorkItem item)
{
runningTasks.remove(item.getTask().getId());
availabilityGroups.remove(item.getTask().getAvailabilityGroup());
}
@Override
public void close() throws IOException
{
@ -117,6 +107,8 @@ public class ZkWorker implements Closeable
{
return "ZkWorker{" +
"worker=" + worker +
", runningTasks=" + runningTasks +
", availabilityGroups=" + availabilityGroups +
", lastCompletedTaskTime=" + lastCompletedTaskTime +
'}';
}

View File

@ -345,6 +345,7 @@ public class IndexerCoordinatorNode extends QueryableNode<IndexerCoordinatorNode
)
), "/*", 0
);
root.addFilter(GuiceFilter.class, "/mmx/indexer/v1/*", 0);
root.addFilter(GuiceFilter.class, "/mmx/merger/v1/*", 0);
initialized = true;

View File

@ -63,7 +63,7 @@ import java.util.concurrent.atomic.AtomicReference;
/**
*/
@Path("/mmx/merger/v1")
@Path("/mmx/indexer/v1")
public class IndexerCoordinatorResource
{
private static final Logger log = new Logger(IndexerCoordinatorResource.class);

View File

@ -68,6 +68,7 @@ public class IndexerCoordinatorServletModule extends JerseyServletModule
protected void configureServlets()
{
bind(IndexerCoordinatorResource.class);
bind(OldIndexerCoordinatorResource.class);
bind(ObjectMapper.class).toInstance(jsonMapper);
bind(IndexerCoordinatorConfig.class).toInstance(indexerCoordinatorConfig);
bind(ServiceEmitter.class).toInstance(emitter);

View File

@ -0,0 +1,33 @@
package com.metamx.druid.indexing.coordinator.http;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.inject.Inject;
import com.metamx.druid.config.JacksonConfigManager;
import com.metamx.druid.indexing.common.tasklogs.TaskLogProvider;
import com.metamx.druid.indexing.coordinator.TaskMasterLifecycle;
import com.metamx.druid.indexing.coordinator.TaskStorageQueryAdapter;
import com.metamx.druid.indexing.coordinator.config.IndexerCoordinatorConfig;
import com.metamx.emitter.service.ServiceEmitter;
import javax.ws.rs.Path;
/**
*/
@Deprecated
@Path("/mmx/merger/v1")
public class OldIndexerCoordinatorResource extends IndexerCoordinatorResource
{
@Inject
public OldIndexerCoordinatorResource(
IndexerCoordinatorConfig config,
ServiceEmitter emitter,
TaskMasterLifecycle taskMasterLifecycle,
TaskStorageQueryAdapter taskStorageQueryAdapter,
TaskLogProvider taskLogProvider,
JacksonConfigManager configManager,
ObjectMapper jsonMapper
) throws Exception
{
super(config, emitter, taskMasterLifecycle, taskStorageQueryAdapter, taskLogProvider, configManager, jsonMapper);
}
}

View File

@ -201,6 +201,7 @@ public class TaskSerdeTest
public void testRealtimeIndexTaskSerde() throws Exception
{
final Task task = new RealtimeIndexTask(
null,
null,
new Schema("foo", null, new AggregatorFactory[0], QueryGranularity.NONE, new NoneShardSpec()),
null,

View File

@ -0,0 +1,127 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.indexing.coordinator;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.indexing.common.TaskLock;
import com.metamx.druid.indexing.common.TaskStatus;
import com.metamx.druid.indexing.common.TaskToolbox;
import com.metamx.druid.indexing.common.actions.LockAcquireAction;
import com.metamx.druid.indexing.common.actions.LockListAction;
import com.metamx.druid.indexing.common.actions.LockReleaseAction;
import com.metamx.druid.indexing.common.actions.SegmentInsertAction;
import com.metamx.druid.indexing.common.task.AbstractTask;
import org.joda.time.Interval;
import org.junit.Assert;
import java.util.List;
/**
*/
public class RealtimeishTask extends AbstractTask
{
public RealtimeishTask()
{
super("rt1", "rt", "rt1", "foo", null);
}
public RealtimeishTask(String id, String groupId, String availGroup, String dataSource, Interval interval)
{
super(id, groupId, availGroup, dataSource, interval);
}
@Override
public String getType()
{
return "realtime_test";
}
@Override
public TaskStatus run(TaskToolbox toolbox) throws Exception
{
final Interval interval1 = new Interval("2010-01-01T00/PT1H");
final Interval interval2 = new Interval("2010-01-01T01/PT1H");
// Sort of similar to what realtime tasks do:
// Acquire lock for first interval
final TaskLock lock1 = toolbox.getTaskActionClient().submit(new LockAcquireAction(interval1));
final List<TaskLock> locks1 = toolbox.getTaskActionClient().submit(new LockListAction());
// (Confirm lock sanity)
Assert.assertEquals("lock1 interval", interval1, lock1.getInterval());
Assert.assertEquals("locks1", ImmutableList.of(lock1), locks1);
// Acquire lock for second interval
final TaskLock lock2 = toolbox.getTaskActionClient().submit(new LockAcquireAction(interval2));
final List<TaskLock> locks2 = toolbox.getTaskActionClient().submit(new LockListAction());
// (Confirm lock sanity)
Assert.assertEquals("lock2 interval", interval2, lock2.getInterval());
Assert.assertEquals("locks2", ImmutableList.of(lock1, lock2), locks2);
// Push first segment
toolbox.getTaskActionClient()
.submit(
new SegmentInsertAction(
ImmutableSet.of(
DataSegment.builder()
.dataSource("foo")
.interval(interval1)
.version(lock1.getVersion())
.build()
)
)
);
// Release first lock
toolbox.getTaskActionClient().submit(new LockReleaseAction(interval1));
final List<TaskLock> locks3 = toolbox.getTaskActionClient().submit(new LockListAction());
// (Confirm lock sanity)
Assert.assertEquals("locks3", ImmutableList.of(lock2), locks3);
// Push second segment
toolbox.getTaskActionClient()
.submit(
new SegmentInsertAction(
ImmutableSet.of(
DataSegment.builder()
.dataSource("foo")
.interval(interval2)
.version(lock2.getVersion())
.build()
)
)
);
// Release second lock
toolbox.getTaskActionClient().submit(new LockReleaseAction(interval2));
final List<TaskLock> locks4 = toolbox.getTaskActionClient().submit(new LockListAction());
// (Confirm lock sanity)
Assert.assertEquals("locks4", ImmutableList.<TaskLock>of(), locks4);
// Exit
return TaskStatus.success(getId());
}
}

View File

@ -2,6 +2,7 @@ package com.metamx.druid.indexing.coordinator;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.google.common.base.Stopwatch;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.FutureCallback;
@ -41,8 +42,10 @@ import org.junit.Before;
import org.junit.Test;
import java.io.File;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import static junit.framework.Assert.fail;
@ -245,6 +248,34 @@ public class RemoteTaskRunnerTest
Assert.assertTrue("TaskCallback was not called!", callbackCalled.booleanValue());
}
@Test
public void testRunSameAvailabilityGroup() throws Exception
{
TestRealtimeTask theTask = new TestRealtimeTask("rt1", "rt1", "foo", TaskStatus.running("rt1"));
remoteTaskRunner.run(theTask);
remoteTaskRunner.run(
new TestRealtimeTask("rt2", "rt1", "foo", TaskStatus.running("rt2"))
);
remoteTaskRunner.run(
new TestRealtimeTask("rt3", "rt2", "foo", TaskStatus.running("rt3"))
);
Stopwatch stopwatch = new Stopwatch();
stopwatch.start();
while (remoteTaskRunner.getRunningTasks().isEmpty()) {
Thread.sleep(100);
if (stopwatch.elapsed(TimeUnit.MILLISECONDS) > 1000) {
throw new ISE("Cannot find running task");
}
}
Assert.assertTrue(remoteTaskRunner.getRunningTasks().size() == 2);
Assert.assertTrue(remoteTaskRunner.getPendingTasks().size() == 1);
Assert.assertTrue(remoteTaskRunner.getPendingTasks().iterator().next().getTask().getId().equals("rt2"));
}
private void makeTaskMonitor() throws Exception
{
WorkerCuratorCoordinator workerCuratorCoordinator = new WorkerCuratorCoordinator(
@ -323,6 +354,7 @@ public class RemoteTaskRunnerTest
Executors.newSingleThreadExecutor()
);
jsonMapper.registerSubtypes(new NamedType(TestTask.class, "test"));
jsonMapper.registerSubtypes(new NamedType(TestRealtimeTask.class, "test_realtime"));
workerTaskMonitor.start();
}

View File

@ -273,90 +273,6 @@ public class TaskLifecycleTest
@Test
public void testRealtimeishTask() throws Exception
{
class RealtimeishTask extends AbstractTask
{
RealtimeishTask()
{
super("rt1", "rt", "foo", null);
}
@Override
public String getType()
{
return "realtime_test";
}
@Override
public TaskStatus run(TaskToolbox toolbox) throws Exception
{
final Interval interval1 = new Interval("2010-01-01T00/PT1H");
final Interval interval2 = new Interval("2010-01-01T01/PT1H");
// Sort of similar to what realtime tasks do:
// Acquire lock for first interval
final TaskLock lock1 = toolbox.getTaskActionClient().submit(new LockAcquireAction(interval1));
final List<TaskLock> locks1 = toolbox.getTaskActionClient().submit(new LockListAction());
// (Confirm lock sanity)
Assert.assertEquals("lock1 interval", interval1, lock1.getInterval());
Assert.assertEquals("locks1", ImmutableList.of(lock1), locks1);
// Acquire lock for second interval
final TaskLock lock2 = toolbox.getTaskActionClient().submit(new LockAcquireAction(interval2));
final List<TaskLock> locks2 = toolbox.getTaskActionClient().submit(new LockListAction());
// (Confirm lock sanity)
Assert.assertEquals("lock2 interval", interval2, lock2.getInterval());
Assert.assertEquals("locks2", ImmutableList.of(lock1, lock2), locks2);
// Push first segment
toolbox.getTaskActionClient()
.submit(
new SegmentInsertAction(
ImmutableSet.of(
DataSegment.builder()
.dataSource("foo")
.interval(interval1)
.version(lock1.getVersion())
.build()
)
)
);
// Release first lock
toolbox.getTaskActionClient().submit(new LockReleaseAction(interval1));
final List<TaskLock> locks3 = toolbox.getTaskActionClient().submit(new LockListAction());
// (Confirm lock sanity)
Assert.assertEquals("locks3", ImmutableList.of(lock2), locks3);
// Push second segment
toolbox.getTaskActionClient()
.submit(
new SegmentInsertAction(
ImmutableSet.of(
DataSegment.builder()
.dataSource("foo")
.interval(interval2)
.version(lock2.getVersion())
.build()
)
)
);
// Release second lock
toolbox.getTaskActionClient().submit(new LockReleaseAction(interval2));
final List<TaskLock> locks4 = toolbox.getTaskActionClient().submit(new LockListAction());
// (Confirm lock sanity)
Assert.assertEquals("locks4", ImmutableList.<TaskLock>of(), locks4);
// Exit
return TaskStatus.success(getId());
}
}
final Task rtishTask = new RealtimeishTask();
final TaskStatus status = runTask(rtishTask);
@ -368,7 +284,7 @@ public class TaskLifecycleTest
@Test
public void testSimple() throws Exception
{
final Task task = new AbstractTask("id1", "id1", "ds", new Interval("2012-01-01/P1D"))
final Task task = new AbstractTask("id1", "id1", "id1", "ds", new Interval("2012-01-01/P1D"))
{
@Override
public String getType()
@ -405,7 +321,7 @@ public class TaskLifecycleTest
@Test
public void testBadInterval() throws Exception
{
final Task task = new AbstractTask("id1", "id1", "ds", new Interval("2012-01-01/P1D"))
final Task task = new AbstractTask("id1", "id1", "id1", "ds", new Interval("2012-01-01/P1D"))
{
@Override
public String getType()
@ -439,7 +355,7 @@ public class TaskLifecycleTest
@Test
public void testBadVersion() throws Exception
{
final Task task = new AbstractTask("id1", "id1", "ds", new Interval("2012-01-01/P1D"))
final Task task = new AbstractTask("id1", "id1", "id1", "ds", new Interval("2012-01-01/P1D"))
{
@Override
public String getType()

View File

@ -346,7 +346,7 @@ public class TaskQueueTest
private static Task newTask(final String id, final String groupId, final String dataSource, final Interval interval)
{
return new AbstractTask(id, groupId, dataSource, interval)
return new AbstractTask(id, groupId, id, dataSource, interval)
{
@Override
public TaskStatus run(TaskToolbox toolbox) throws Exception
@ -370,7 +370,7 @@ public class TaskQueueTest
final List<Task> nextTasks
)
{
return new AbstractTask(id, groupId, dataSource, interval)
return new AbstractTask(id, groupId, id, dataSource, interval)
{
@Override
public String getType()

View File

@ -0,0 +1,78 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.indexing.coordinator;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.metamx.druid.QueryGranularity;
import com.metamx.druid.aggregation.AggregatorFactory;
import com.metamx.druid.indexing.common.TaskStatus;
import com.metamx.druid.indexing.common.TaskToolbox;
import com.metamx.druid.indexing.common.task.RealtimeIndexTask;
import com.metamx.druid.realtime.Schema;
import com.metamx.druid.shard.NoneShardSpec;
/**
*/
@JsonTypeName("test_realtime")
public class TestRealtimeTask extends RealtimeIndexTask
{
private final TaskStatus status;
@JsonCreator
public TestRealtimeTask(
@JsonProperty("id") String id,
@JsonProperty("availabilityGroup") String availGroup,
@JsonProperty("dataSource") String dataSource,
@JsonProperty("taskStatus") TaskStatus status
)
{
super(
id,
availGroup,
new Schema(dataSource, null, new AggregatorFactory[]{}, QueryGranularity.NONE, new NoneShardSpec()),
null,
null,
null,
null
);
this.status = status;
}
@Override
@JsonProperty
public String getType()
{
return "test_realtime";
}
@JsonProperty
public TaskStatus getStatus()
{
return status;
}
@Override
public TaskStatus run(TaskToolbox toolbox) throws Exception
{
return status;
}
}

View File

@ -334,7 +334,7 @@ public class SimpleResourceManagementStrategyTest
Task testTask
)
{
super(new Worker("host", "ip", 3, "version"), null, null);
super(new Worker("host", "ip", 3, "version"), null);
this.testTask = testTask;
}

View File

@ -24,7 +24,7 @@
<groupId>com.metamx</groupId>
<artifactId>druid</artifactId>
<packaging>pom</packaging>
<version>0.4.33-SNAPSHOT</version>
<version>0.5.0-SNAPSHOT</version>
<name>druid</name>
<description>druid</description>
<scm>

View File

@ -346,11 +346,16 @@ public class InfoResource
@Path("/rules/{dataSourceName}")
@Produces("application/json")
public Response getDatasourceRules(
@PathParam("dataSourceName") final String dataSourceName
@PathParam("dataSourceName") final String dataSourceName,
@QueryParam("full") final String full
)
{
return Response.status(Response.Status.OK)
.entity(databaseRuleManager.getRules(dataSourceName))
if (full != null) {
return Response.ok(databaseRuleManager.getRulesWithDefault(dataSourceName))
.build();
}
return Response.ok(databaseRuleManager.getRules(dataSourceName))
.build();
}