Merge branch 'killsegments' into task-stuff

Conflicts:
	merger/src/main/java/com/metamx/druid/merger/common/TaskStatus.java
	merger/src/main/java/com/metamx/druid/merger/common/task/DeleteTask.java
	merger/src/main/java/com/metamx/druid/merger/common/task/IndexGeneratorTask.java
	merger/src/main/java/com/metamx/druid/merger/common/task/IndexTask.java
	merger/src/main/java/com/metamx/druid/merger/common/task/MergeTask.java
	merger/src/main/java/com/metamx/druid/merger/coordinator/LocalTaskRunner.java
	merger/src/main/java/com/metamx/druid/merger/coordinator/TaskQueue.java
	merger/src/main/java/com/metamx/druid/merger/coordinator/exec/TaskConsumer.java
	merger/src/main/java/com/metamx/druid/merger/worker/TaskMonitor.java
	merger/src/test/java/com/metamx/druid/merger/coordinator/RemoteTaskRunnerTest.java
	merger/src/test/java/com/metamx/druid/merger/coordinator/TaskQueueTest.java
This commit is contained in:
Gian Merlino 2013-01-25 11:30:10 -08:00
commit e6a618ca76
31 changed files with 904 additions and 163 deletions

View File

@ -19,7 +19,6 @@
package com.metamx.druid.client;
import com.google.common.collect.Maps;
import com.metamx.common.Pair;
import com.metamx.common.logger.Logger;
import com.metamx.phonebook.PhoneBook;
@ -27,12 +26,13 @@ import com.metamx.phonebook.PhoneBookPeon;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
*/
public class ServerInventoryManager extends InventoryManager<DruidServer>
{
private static final Map<String, Integer> removedSegments = Maps.newHashMap();
private static final Map<String, Integer> removedSegments = new ConcurrentHashMap<String, Integer>();
public ServerInventoryManager(
ServerInventoryManagerConfig config,

View File

@ -0,0 +1,41 @@
package com.metamx.druid.merge;
import org.codehaus.jackson.annotate.JsonCreator;
import org.codehaus.jackson.annotate.JsonProperty;
import org.joda.time.Interval;
/**
*/
public class ClientKillQuery
{
private final String dataSource;
private final Interval interval;
@JsonCreator
public ClientKillQuery(
@JsonProperty("dataSource") String dataSource,
@JsonProperty("interval") Interval interval
)
{
this.dataSource = dataSource;
this.interval = interval;
}
@JsonProperty
public String getType()
{
return "kill";
}
@JsonProperty
public String getDataSource()
{
return dataSource;
}
@JsonProperty
public Interval getInterval()
{
return interval;
}
}

View File

@ -51,7 +51,7 @@
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk</artifactId>
<version>1.2.15</version>
<version>1.3.27</version>
<exclusions>
<exclusion>
<groupId>javax.mail</groupId>

View File

@ -23,6 +23,7 @@ import com.google.common.collect.ImmutableMap;
import com.metamx.druid.loading.S3SegmentPuller;
import com.metamx.druid.loading.S3SegmentGetterConfig;
import com.metamx.druid.loading.S3ZippedSegmentPuller;
import com.metamx.druid.loading.SegmentKiller;
import com.metamx.druid.loading.SegmentPuller;
import com.metamx.druid.merger.common.task.Task;
import com.metamx.druid.merger.coordinator.config.IndexerCoordinatorConfig;
@ -43,6 +44,7 @@ public class TaskToolbox
private final ServiceEmitter emitter;
private final RestS3Service s3Client;
private final SegmentPusher segmentPusher;
private final SegmentKiller segmentKiller;
private final ObjectMapper objectMapper;
public TaskToolbox(
@ -50,6 +52,7 @@ public class TaskToolbox
ServiceEmitter emitter,
RestS3Service s3Client,
SegmentPusher segmentPusher,
SegmentKiller segmentKiller,
ObjectMapper objectMapper
)
{
@ -57,6 +60,7 @@ public class TaskToolbox
this.emitter = emitter;
this.s3Client = s3Client;
this.segmentPusher = segmentPusher;
this.segmentKiller = segmentKiller;
this.objectMapper = objectMapper;
}
@ -80,6 +84,11 @@ public class TaskToolbox
return segmentPusher;
}
public SegmentKiller getSegmentKiller()
{
return segmentKiller;
}
public ObjectMapper getObjectMapper()
{
return objectMapper;

View File

@ -29,7 +29,6 @@ import com.metamx.druid.index.v1.IncrementalIndex;
import com.metamx.druid.index.v1.IncrementalIndexAdapter;
import com.metamx.druid.index.v1.IndexMerger;
import com.metamx.druid.index.v1.IndexableAdapter;
import com.metamx.druid.jackson.DefaultObjectMapper;
import com.metamx.druid.merger.common.TaskCallback;
import com.metamx.druid.merger.common.TaskStatus;
import com.metamx.druid.merger.common.TaskToolbox;
@ -37,12 +36,10 @@ import com.metamx.druid.merger.coordinator.TaskContext;
import com.metamx.druid.shard.NoneShardSpec;
import org.codehaus.jackson.annotate.JsonCreator;
import org.codehaus.jackson.annotate.JsonProperty;
import org.codehaus.jackson.map.ObjectMapper;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import java.io.File;
import java.util.ArrayList;
public class DeleteTask extends AbstractTask
{

View File

@ -20,7 +20,6 @@
package com.metamx.druid.merger.common.task;
import com.google.common.base.Function;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Ordering;
@ -43,16 +42,18 @@ import org.codehaus.jackson.annotate.JsonCreator;
import org.codehaus.jackson.annotate.JsonProperty;
import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.util.List;
import java.util.Map;
import java.util.Set;
public class IndexDeterminePartitionsTask extends AbstractTask
{
@JsonProperty private final FirehoseFactory firehoseFactory;
@JsonProperty private final Schema schema;
@JsonProperty private final long targetPartitionSize;
@JsonProperty
private final FirehoseFactory firehoseFactory;
@JsonProperty
private final Schema schema;
@JsonProperty
private final long targetPartitionSize;
private static final Logger log = new Logger(IndexTask.class);
@ -109,24 +110,24 @@ public class IndexDeterminePartitionsTask extends AbstractTask
final Firehose firehose = firehoseFactory.connect();
try {
while(firehose.hasMore()) {
while (firehose.hasMore()) {
final InputRow inputRow = firehose.nextRow();
if(getInterval().contains(inputRow.getTimestampFromEpoch())) {
if (getInterval().contains(inputRow.getTimestampFromEpoch())) {
// Extract dimensions from event
for (final String dim : inputRow.getDimensions()) {
final List<String> dimValues = inputRow.getDimension(dim);
if(!unusableDimensions.contains(dim)) {
if (!unusableDimensions.contains(dim)) {
if(dimValues.size() == 1) {
if (dimValues.size() == 1) {
// Track this value
TreeMultiset<String> dimensionValueMultiset = dimensionValueMultisets.get(dim);
if(dimensionValueMultiset == null) {
if (dimensionValueMultiset == null) {
dimensionValueMultiset = TreeMultiset.create();
dimensionValueMultisets.put(dim, dimensionValueMultiset);
}
@ -147,7 +148,8 @@ public class IndexDeterminePartitionsTask extends AbstractTask
}
}
} finally {
}
finally {
firehose.close();
}
@ -167,7 +169,7 @@ public class IndexDeterminePartitionsTask extends AbstractTask
}
};
if(dimensionValueMultisets.isEmpty()) {
if (dimensionValueMultisets.isEmpty()) {
// No suitable partition dimension. We'll make one big segment and hope for the best.
log.info("No suitable partition dimension found");
shardSpecs.add(new NoneShardSpec());
@ -189,9 +191,9 @@ public class IndexDeterminePartitionsTask extends AbstractTask
// Iterate over unique partition dimension values in sorted order
String currentPartitionStart = null;
int currentPartitionSize = 0;
for(final String partitionDimValue : partitionDimValues.elementSet()) {
for (final String partitionDimValue : partitionDimValues.elementSet()) {
currentPartitionSize += partitionDimValues.count(partitionDimValue);
if(currentPartitionSize >= targetPartitionSize) {
if (currentPartitionSize >= targetPartitionSize) {
final ShardSpec shardSpec = new SingleDimensionShardSpec(
partitionDim,
currentPartitionStart,
@ -229,24 +231,25 @@ public class IndexDeterminePartitionsTask extends AbstractTask
return TaskStatus.success(getId()).withNextTasks(
Lists.transform(
shardSpecs, new Function<ShardSpec, Task>()
{
@Override
public Task apply(ShardSpec shardSpec)
{
return new IndexGeneratorTask(
getGroupId(),
getInterval(),
firehoseFactory,
new Schema(
schema.getDataSource(),
schema.getAggregators(),
schema.getIndexGranularity(),
shardSpec
)
);
}
}
shardSpecs,
new Function<ShardSpec, Task>()
{
@Override
public Task apply(ShardSpec shardSpec)
{
return new IndexGeneratorTask(
getGroupId(),
getInterval(),
firehoseFactory,
new Schema(
schema.getDataSource(),
schema.getAggregators(),
schema.getIndexGranularity(),
shardSpec
)
);
}
}
)
);
}

View File

@ -0,0 +1,61 @@
package com.metamx.druid.merger.common.task;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.metamx.common.logger.Logger;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.merger.common.TaskCallback;
import com.metamx.druid.merger.common.TaskStatus;
import com.metamx.druid.merger.common.TaskToolbox;
import com.metamx.druid.merger.coordinator.TaskContext;
import org.codehaus.jackson.annotate.JsonCreator;
import org.codehaus.jackson.annotate.JsonProperty;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import java.util.List;
import java.util.Set;
/**
*/
public class KillTask extends AbstractTask
{
private static final Logger log = new Logger(KillTask.class);
@JsonCreator
public KillTask(
@JsonProperty("dataSource") String dataSource,
@JsonProperty("interval") Interval interval
)
{
super(
String.format(
"kill_%s_%s_%s_%s",
dataSource,
interval.getStart(),
interval.getEnd(),
new DateTime().toString()
),
dataSource,
interval
);
}
@Override
public Type getType()
{
return Task.Type.KILL;
}
@Override
public TaskStatus run(TaskContext context, TaskToolbox toolbox, TaskCallback callback) throws Exception
{
// Kill segments
Set<DataSegment> segmentsToKill = ImmutableSet.copyOf(
toolbox.getSegmentKiller()
.kill(getDataSource(), getInterval())
);
return TaskStatus.success(getId(), segmentsToKill);
}
}

View File

@ -34,6 +34,7 @@ import org.joda.time.Interval;
@JsonSubTypes(value = {
@JsonSubTypes.Type(name = "append", value = AppendTask.class),
@JsonSubTypes.Type(name = "delete", value = DeleteTask.class),
@JsonSubTypes.Type(name = "kill", value = KillTask.class),
@JsonSubTypes.Type(name = "index", value = IndexTask.class),
@JsonSubTypes.Type(name = "index_partitions", value = IndexDeterminePartitionsTask.class),
@JsonSubTypes.Type(name = "index_generator", value = IndexGeneratorTask.class)
@ -46,7 +47,8 @@ public interface Task
MERGE,
APPEND,
DELETE,
TEST
TEST,
KILL
}
public String getId();

View File

@ -191,4 +191,24 @@ public class MergerDBCoordinator
}
}
}
public void deleteSegment(final DataSegment segment)
{
dbi.withHandle(
new HandleCallback<Void>()
{
@Override
public Void withHandle(Handle handle) throws Exception
{
handle.createStatement(
String.format("DELETE from %s WHERE id = :id", dbConnectorConfig.getSegmentTable())
).bind("id", segment.getIdentifier())
.execute();
return null;
}
}
);
}
}

View File

@ -428,27 +428,27 @@ public class RemoteTaskRunner implements TaskRunner
statusLock.notify();
final TaskWrapper taskWrapper = tasks.get(taskId);
if (taskWrapper == null) {
log.warn(
"WTF?! Worker[%s] announcing a status for a task I didn't know about: %s",
worker.getHost(),
taskId
);
} else {
final TaskCallback callback = taskWrapper.getCallback();
// Cleanup
if (callback != null) {
callback.notify(taskStatus);
}
}
if (taskStatus.isComplete()) {
// Worker is done with this task
workerWrapper.setLastCompletedTaskTime(new DateTime());
final TaskWrapper taskWrapper = tasks.get(taskId);
if (taskWrapper == null) {
log.warn(
"WTF?! Worker[%s] completed a task I didn't know about: %s",
worker.getHost(),
taskId
);
} else {
final TaskCallback callback = taskWrapper.getCallback();
// Cleanup
if (callback != null) {
callback.notify(taskStatus);
}
tasks.remove(taskId);
cf.delete().guaranteed().inBackground().forPath(event.getData().getPath());
}
tasks.remove(taskId);
cf.delete().guaranteed().inBackground().forPath(event.getData().getPath());
}
}
}

View File

@ -51,18 +51,18 @@ import java.util.concurrent.locks.ReentrantLock;
/**
* Interface between task producers and task consumers.
*
* <p/>
* The queue accepts tasks from producers using {@link #add} and delivers tasks to consumers using either
* {@link #take} or {@link #poll}. Ordering is mostly-FIFO, with deviations when the natural next task would conflict
* with a currently-running task. In that case, tasks are skipped until a runnable one is found.
*
* <p/>
* To manage locking, the queue keeps track of currently-running tasks as {@link TaskGroup} objects. The idea is that
* only one TaskGroup can be running on a particular dataSource + interval, and that TaskGroup has a single version
* string that all tasks in the group must use to publish segments. Tasks in the same TaskGroup may run concurrently.
*
* <p/>
* For persistence, the queue saves new tasks from {@link #add} and task status updates from {@link #notify} using a
* {@link TaskStorage} object.
*
* <p/>
* To support leader election of our containing system, the queue can be stopped (in which case it will not accept
* any new tasks, or hand out any more tasks, until started again).
*/
@ -116,13 +116,13 @@ public class TaskQueue
}
};
for(final VersionedTaskWrapper taskAndVersion : byVersionOrdering.sortedCopy(runningTasks)) {
for (final VersionedTaskWrapper taskAndVersion : byVersionOrdering.sortedCopy(runningTasks)) {
final Task task = taskAndVersion.getTask();
final String preferredVersion = taskAndVersion.getVersion();
queue.add(task);
if(preferredVersion != null) {
if (preferredVersion != null) {
final Optional<String> version = tryLock(task, Optional.of(preferredVersion));
log.info(
@ -164,7 +164,8 @@ public class TaskQueue
running.clear();
active = false;
} finally {
}
finally {
giant.unlock();
}
}
@ -173,6 +174,7 @@ public class TaskQueue
* Adds some work to the queue and the underlying task storage facility with a generic "running" status.
*
* @param task task to add
*
* @return true
*/
public boolean add(final Task task)
@ -190,13 +192,15 @@ public class TaskQueue
workMayBeAvailable.signalAll();
return true;
} finally {
}
finally {
giant.unlock();
}
}
/**
* Locks and returns next doable work from the queue. Blocks if there is no doable work.
*
* @return runnable task
*/
public VersionedTaskWrapper take() throws InterruptedException
@ -206,19 +210,21 @@ public class TaskQueue
try {
VersionedTaskWrapper taskWrapper;
while((taskWrapper = poll()) == null) {
while ((taskWrapper = poll()) == null) {
log.info("Waiting for work...");
workMayBeAvailable.await();
}
return taskWrapper;
} finally {
}
finally {
giant.unlock();
}
}
/**
* Locks and removes next doable work from the queue. Returns null if there is no doable work.
*
* @return runnable task or null
*/
public VersionedTaskWrapper poll()
@ -227,9 +233,9 @@ public class TaskQueue
try {
log.info("Checking for doable work");
for(final Task task : queue) {
for (final Task task : queue) {
final Optional<String> maybeVersion = tryLock(task);
if(maybeVersion.isPresent()) {
if (maybeVersion.isPresent()) {
Preconditions.checkState(active, "wtf? Found task when inactive");
taskStorage.setVersion(task.getId(), maybeVersion.get());
queue.remove(task);
@ -256,15 +262,16 @@ public class TaskQueue
* the task storage facility, and any nextTasks present in the status will be created. If the status is a completed
* status, the task will be unlocked and no further updates will be accepted. If this task has failed, the task group
* it is part of will be terminated.
*
* <p/>
* Finally, if this task is not supposed to be running, this method will simply do nothing.
*
* @param task task to update
* @param status new task status
* @param task task to update
* @param status new task status
* @param commitRunnable operation to perform if this task is ready to commit
* @throws NullPointerException if task or status is null
*
* @throws NullPointerException if task or status is null
* @throws IllegalArgumentException if the task ID does not match the status ID
* @throws IllegalStateException if this queue is currently shut down
* @throws IllegalStateException if this queue is currently shut down
*/
public void notify(final Task task, final TaskStatus status, final Runnable commitRunnable)
{
@ -284,7 +291,7 @@ public class TaskQueue
final TaskGroup taskGroup;
final Optional<TaskGroup> maybeTaskGroup = findTaskGroupForTask(task);
if(!maybeTaskGroup.isPresent()) {
if (!maybeTaskGroup.isPresent()) {
log.info("Ignoring notification for dead task: %s", task.getId());
return;
} else {
@ -303,20 +310,20 @@ public class TaskQueue
taskStorage.setStatus(task.getId(), status);
// Should we commit?
if(taskGroup.getCommitStyle().shouldCommit(task, status)) {
if (taskGroup.getCommitStyle().shouldCommit(task, status)) {
log.info("Committing %s status for task: %s", status.getStatusCode(), task.getId());
// Add next tasks
try {
if(commitRunnable != null) {
if (commitRunnable != null) {
log.info("Running commitRunnable for task: %s", task.getId());
commitRunnable.run();
}
// We want to allow tasks to submit RUNNING statuses with the same nextTasks over and over.
// So, we need to remember which ones we've already spawned and not do them again.
for(final Task nextTask : status.getNextTasks()) {
if(!seenNextTasks.containsEntry(task.getId(), nextTask.getId())) {
for (final Task nextTask : status.getNextTasks()) {
if (!seenNextTasks.containsEntry(task.getId(), nextTask.getId())) {
add(nextTask);
tryLock(nextTask);
seenNextTasks.put(task.getId(), nextTask.getId());
@ -324,7 +331,8 @@ public class TaskQueue
log.info("Already added followup task %s to original task: %s", nextTask.getId(), task.getId());
}
}
} catch(Exception e) {
}
catch (Exception e) {
log.makeAlert(e, "Failed to commit task")
.addData("task", task.getId())
.addData("statusCode", status.getStatusCode())
@ -337,12 +345,13 @@ public class TaskQueue
log.info("Not committing %s status for task: %s", status.getStatusCode(), task);
}
if(status.isComplete()) {
if (status.isComplete()) {
unlock(task);
seenNextTasks.removeAll(task.getId());
log.info("Task done: %s", task);
}
} finally {
}
finally {
giant.unlock();
}
}
@ -352,6 +361,7 @@ public class TaskQueue
* running.
*
* @param task task to unlock
*
* @throws IllegalStateException if task is not currently locked
*/
private void unlock(final Task task)
@ -364,7 +374,7 @@ public class TaskQueue
final TaskGroup taskGroup;
final Optional<TaskGroup> maybeTaskGroup = findTaskGroupForTask(task);
if(maybeTaskGroup.isPresent()) {
if (maybeTaskGroup.isPresent()) {
taskGroup = maybeTaskGroup.get();
} else {
throw new IllegalStateException(String.format("Task must be running: %s", task.getId()));
@ -374,12 +384,12 @@ public class TaskQueue
log.info("Removing task[%s] from TaskGroup[%s]", task.getId(), taskGroup.getGroupId());
taskGroup.remove(task.getId());
if(taskGroup.size() == 0) {
if (taskGroup.size() == 0) {
log.info("TaskGroup complete: %s", taskGroup);
running.get(dataSource).remove(taskGroup.getInterval());
}
if(running.get(dataSource).size() == 0) {
if (running.get(dataSource).size() == 0) {
running.remove(dataSource);
}
@ -394,6 +404,7 @@ public class TaskQueue
* Attempt to lock a task, without removing it from the queue. Can safely be called multiple times on the same task.
*
* @param task task to attempt to lock
*
* @return lock version if lock was acquired, absent otherwise
*/
private Optional<String> tryLock(final Task task)
@ -404,8 +415,9 @@ public class TaskQueue
/**
* Attempt to lock a task, without removing it from the queue. Can safely be called multiple times on the same task.
*
* @param task task to attempt to lock
* @param task task to attempt to lock
* @param preferredVersion use this version if possible (no guarantees, though!)
*
* @return lock version if lock was acquired, absent otherwise
*/
private Optional<String> tryLock(final Task task, final Optional<String> preferredVersion)
@ -449,7 +461,7 @@ public class TaskQueue
final String version;
if(preferredVersion.isPresent()) {
if (preferredVersion.isPresent()) {
// We have a preferred version. Since this is a private method, we'll trust our caller to not break our
// ordering assumptions and just use it.
version = preferredVersion.get();
@ -478,7 +490,8 @@ public class TaskQueue
return Optional.of(taskGroupToUse.getVersion());
} finally {
}
finally {
giant.unlock();
}
@ -509,14 +522,15 @@ public class TaskQueue
)
);
if(maybeTaskGroup.size() == 1) {
if (maybeTaskGroup.size() == 1) {
return Optional.of(maybeTaskGroup.get(0));
} else if(maybeTaskGroup.size() == 0) {
} else if (maybeTaskGroup.size() == 0) {
return Optional.absent();
} else {
throw new IllegalStateException(String.format("WTF?! Task %s is in multiple task groups!", task.getId()));
}
} finally {
}
finally {
giant.unlock();
}
}
@ -530,7 +544,7 @@ public class TaskQueue
try {
final NavigableMap<Interval, TaskGroup> dsRunning = running.get(dataSource);
if(dsRunning == null) {
if (dsRunning == null) {
// No locks at all
return Collections.emptyList();
} else {

View File

@ -21,6 +21,7 @@ package com.metamx.druid.merger.coordinator.exec;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableSet;
import com.metamx.common.ISE;
import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.druid.client.DataSegment;
@ -36,6 +37,9 @@ import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.emitter.service.ServiceMetricEvent;
import java.util.List;
import java.util.Set;
public class TaskConsumer implements Runnable
{
private final TaskQueue queue;
@ -141,7 +145,8 @@ public class TaskConsumer implements Runnable
try {
preflightStatus = task.preflight(context);
log.info("Preflight done for task: %s", task.getId());
} catch(Exception e) {
}
catch (Exception e) {
preflightStatus = TaskStatus.failure(task.getId());
log.error(e, "Exception thrown during preflight for task: %s", task.getId());
}
@ -177,45 +182,16 @@ public class TaskConsumer implements Runnable
public void run()
{
try {
// Publish returned segments
// TODO -- Publish in transaction
if(statusFromRunner.getSegments().size() > 0) {
for (DataSegment segment : statusFromRunner.getSegments()) {
if (!task.getDataSource().equals(segment.getDataSource())) {
throw new IllegalStateException(
String.format(
"Segment for task[%s] has invalid dataSource: %s",
task.getId(),
segment.getIdentifier()
)
);
}
if (!task.getInterval().contains(segment.getInterval())) {
throw new IllegalStateException(
String.format(
"Segment for task[%s] has invalid interval: %s",
task.getId(),
segment.getIdentifier()
)
);
}
if (!context.getVersion().equals(segment.getVersion())) {
throw new IllegalStateException(
String.format(
"Segment for task[%s] has invalid version: %s",
task.getId(),
segment.getIdentifier()
)
);
}
log.info("Publishing segment[%s] for task[%s]", segment.getIdentifier(), task.getId());
mergerDBCoordinator.announceHistoricalSegment(segment);
}
// TODO -- Publish in transaction
publishSegments(task, context, statusFromRunner.getSegments());
}
} catch(Exception e) {
if(statusFromRunner.getSegmentsNuked().size() > 0) {
deleteSegments(task, context, statusFromRunner.getSegmentsNuked());
}
}
catch (Exception e) {
log.error(e, "Exception while publishing segments for task: %s", task);
throw Throwables.propagate(e);
}
@ -262,4 +238,57 @@ public class TaskConsumer implements Runnable
}
);
}
private void deleteSegments(Task task, TaskContext context, Set<DataSegment> segments) throws Exception
{
for (DataSegment segment : segments) {
verifySegment(task, context, segment);
log.info("Deleting segment[%s] for task[%s]", segment.getIdentifier(), task.getId());
mergerDBCoordinator.deleteSegment(segment);
}
}
private void publishSegments(Task task, TaskContext context, Set<DataSegment> segments) throws Exception
{
for (DataSegment segment : segments) {
verifySegment(task, context, segment);
log.info("Publishing segment[%s] for task[%s]", segment.getIdentifier(), task.getId());
mergerDBCoordinator.announceHistoricalSegment(segment);
}
}
private void verifySegment(Task task, TaskContext context, DataSegment segment)
{
if (!task.getDataSource().equals(segment.getDataSource())) {
throw new IllegalStateException(
String.format(
"Segment for task[%s] has invalid dataSource: %s",
task.getId(),
segment.getIdentifier()
)
);
}
if (!task.getInterval().contains(segment.getInterval())) {
throw new IllegalStateException(
String.format(
"Segment for task[%s] has invalid interval: %s",
task.getId(),
segment.getIdentifier()
)
);
}
if (!context.getVersion().equals(segment.getVersion())) {
throw new IllegalStateException(
String.format(
"Segment for task[%s] has invalid version: %s",
task.getId(),
segment.getIdentifier()
)
);
}
}
}

View File

@ -47,6 +47,8 @@ import com.metamx.druid.initialization.Initialization;
import com.metamx.druid.initialization.ServerConfig;
import com.metamx.druid.initialization.ServiceDiscoveryConfig;
import com.metamx.druid.jackson.DefaultObjectMapper;
import com.metamx.druid.loading.S3SegmentKiller;
import com.metamx.druid.loading.SegmentKiller;
import com.metamx.druid.loading.S3SegmentPusher;
import com.metamx.druid.loading.S3SegmentPusherConfig;
import com.metamx.druid.loading.SegmentPusher;
@ -129,6 +131,8 @@ public class IndexerCoordinatorNode extends RegisteringNode
private List<Monitor> monitors = null;
private ServiceEmitter emitter = null;
private DbConnectorConfig dbConnectorConfig = null;
private DBI dbi = null;
private IndexerCoordinatorConfig config = null;
private TaskToolbox taskToolbox = null;
private MergerDBCoordinator mergerDBCoordinator = null;
@ -207,6 +211,7 @@ public class IndexerCoordinatorNode extends RegisteringNode
initializeEmitter();
initializeMonitors();
initializeDB();
initializeIndexerCoordinatorConfig();
initializeMergeDBCoordinator();
initializeTaskToolbox();
@ -387,6 +392,16 @@ public class IndexerCoordinatorNode extends RegisteringNode
}
}
private void initializeDB()
{
if (dbConnectorConfig == null) {
dbConnectorConfig = configFactory.build(DbConnectorConfig.class);
}
if (dbi == null) {
dbi = new DbConnector(dbConnectorConfig).getDBI();
}
}
private void initializeIndexerCoordinatorConfig()
{
if (config == null) {
@ -408,18 +423,23 @@ public class IndexerCoordinatorNode extends RegisteringNode
configFactory.build(S3SegmentPusherConfig.class),
jsonMapper
);
taskToolbox = new TaskToolbox(config, emitter, s3Client, segmentPusher, jsonMapper);
final SegmentKiller segmentKiller = new S3SegmentKiller(
s3Client,
dbi,
dbConnectorConfig,
jsonMapper
);
taskToolbox = new TaskToolbox(config, emitter, s3Client, segmentPusher, segmentKiller, jsonMapper);
}
}
public void initializeMergeDBCoordinator()
{
if (mergerDBCoordinator == null) {
final DbConnectorConfig dbConnectorConfig = configFactory.build(DbConnectorConfig.class);
mergerDBCoordinator = new MergerDBCoordinator(
jsonMapper,
dbConnectorConfig,
new DbConnector(dbConnectorConfig).getDBI()
dbi
);
}
}

View File

@ -111,16 +111,14 @@ public class TaskMonitor
try {
workerCuratorCoordinator.unannounceTask(task.getId());
workerCuratorCoordinator.announceStatus(TaskStatus.running(task.getId()));
taskStatus = task.run(
taskContext, toolbox, new TaskCallback()
taskStatus = task.run(taskContext, toolbox, new TaskCallback()
{
@Override
public void notify(TaskStatus status)
{
workerCuratorCoordinator.announceStatus(status);
workerCuratorCoordinator.updateStatus(status);
}
}
);
});
}
catch (Exception e) {
log.makeAlert(e, "Failed to run task")

View File

@ -29,11 +29,15 @@ import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.common.logger.Logger;
import com.metamx.druid.RegisteringNode;
import com.metamx.druid.db.DbConnector;
import com.metamx.druid.db.DbConnectorConfig;
import com.metamx.druid.http.StatusServlet;
import com.metamx.druid.initialization.CuratorConfig;
import com.metamx.druid.initialization.Initialization;
import com.metamx.druid.initialization.ServerConfig;
import com.metamx.druid.jackson.DefaultObjectMapper;
import com.metamx.druid.loading.S3SegmentKiller;
import com.metamx.druid.loading.SegmentKiller;
import com.metamx.druid.merger.common.TaskToolbox;
import com.metamx.druid.merger.common.config.IndexerZkConfig;
import com.metamx.druid.merger.common.index.StaticS3FirehoseFactory;
@ -69,6 +73,7 @@ import org.mortbay.jetty.servlet.Context;
import org.mortbay.jetty.servlet.DefaultServlet;
import org.mortbay.jetty.servlet.ServletHolder;
import org.skife.config.ConfigurationObjectFactory;
import org.skife.jdbi.v2.DBI;
import java.io.IOException;
import java.util.Arrays;
@ -293,7 +298,15 @@ public class WorkerNode extends RegisteringNode
configFactory.build(S3SegmentPusherConfig.class),
jsonMapper
);
taskToolbox = new TaskToolbox(coordinatorConfig, emitter, s3Client, segmentPusher, jsonMapper);
final DbConnectorConfig dbConnectorConfig = configFactory.build(DbConnectorConfig.class);
DBI dbi = new DbConnector(dbConnectorConfig).getDBI();
final SegmentKiller segmentKiller = new S3SegmentKiller(
s3Client,
dbi,
dbConnectorConfig,
jsonMapper
);
taskToolbox = new TaskToolbox(coordinatorConfig, emitter, s3Client, segmentPusher, segmentKiller, jsonMapper);
}
}

View File

@ -191,10 +191,7 @@ public class RemoteTaskRunnerTest
cf.create().creatingParentsIfNeeded().forPath(
String.format("%s/worker1/task1", statusPath),
jsonMapper.writeValueAsBytes(
TaskStatus.success(
"task1",
ImmutableSet.<DataSegment>of()
)
TaskStatus.success("task1")
)
);
@ -327,7 +324,7 @@ public class RemoteTaskRunnerTest
{
return null;
}
}, null, null, null, jsonMapper
}, null, null, null, null, jsonMapper
),
Executors.newSingleThreadExecutor()
);
@ -504,7 +501,7 @@ public class RemoteTaskRunnerTest
@Override
public TaskStatus run(TaskContext context, TaskToolbox toolbox, TaskCallback callback) throws Exception
{
return TaskStatus.success("task1", ImmutableSet.<DataSegment>of());
return TaskStatus.success("task1");
}
}
}

View File

@ -189,7 +189,11 @@ public class DatabaseRuleManager
public Map<String, List<Rule>> withHandle(Handle handle) throws Exception
{
return handle.createQuery(
String.format("SELECT dataSource, payload FROM %s", config.getRuleTable())
// Return latest version rule by dataSource
String.format(
"SELECT %1$s.dataSource, %1$s.payload FROM %1$s INNER JOIN(SELECT dataSource, max(version) as version, payload FROM %1$s GROUP BY dataSource) ds ON %1$s.datasource = ds.datasource and %1$s.version = ds.version",
config.getRuleTable()
)
).fold(
Maps.<String, List<Rule>>newHashMap(),
new Folder3<Map<String, List<Rule>>, Map<String, Object>>()

View File

@ -23,6 +23,7 @@ import com.google.common.base.Function;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Ordering;
import com.metamx.common.MapUtils;
import com.metamx.common.concurrent.ScheduledExecutors;
import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
@ -45,6 +46,7 @@ import org.skife.jdbi.v2.tweak.HandleCallback;
import javax.annotation.Nullable;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
@ -343,6 +345,44 @@ public class DatabaseSegmentManager
return dataSources.get().values();
}
public Collection<String> getAllDatasourceNames()
{
synchronized (lock) {
return dbi.withHandle(
new HandleCallback<List<String>>()
{
@Override
public List<String> withHandle(Handle handle) throws Exception
{
return handle.createQuery(
String.format("SELECT DISTINCT(datasource) FROM %s", config.getSegmentTable())
)
.fold(
Lists.<String>newArrayList(),
new Folder3<ArrayList<String>, Map<String, Object>>()
{
@Override
public ArrayList<String> fold(
ArrayList<String> druidDataSources,
Map<String, Object> stringObjectMap,
FoldController foldController,
StatementContext statementContext
) throws SQLException
{
druidDataSources.add(
MapUtils.getString(stringObjectMap, "datasource")
);
return druidDataSources;
}
}
);
}
}
);
}
}
public void poll()
{
try {

View File

@ -548,10 +548,14 @@ public class InfoResource
@Path("/db/datasources")
@Produces("application/json")
public Response getDatabaseDataSources(
@QueryParam("full") String full
@QueryParam("full") String full,
@QueryParam("includeDisabled") String includeDisabled
)
{
Response.ResponseBuilder builder = Response.status(Response.Status.OK);
if (includeDisabled != null) {
return builder.entity(databaseSegmentManager.getAllDatasourceNames()).build();
}
if (full != null) {
return builder.entity(databaseSegmentManager.getInventory()).build();
}

View File

@ -190,7 +190,9 @@ public class MasterMain
emitter,
scheduledExecutorFactory,
new ConcurrentHashMap<String, LoadQueuePeon>(),
serviceProvider
serviceProvider,
httpClient,
new ToStringResponseHandler(Charsets.UTF_8)
);
lifecycle.addManagedInstance(master);

View File

@ -21,13 +21,13 @@ package com.metamx.druid.http;
import com.metamx.druid.master.DruidMaster;
import com.metamx.druid.master.LoadPeonCallback;
import com.metamx.druid.merge.ClientKillQuery;
import javax.inject.Inject;
import javax.ws.rs.Consumes;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.core.Response;
import java.util.List;
@ -111,6 +111,15 @@ public class MasterResource
return resp;
}
@POST
@Path("/kill")
@Consumes("application/json")
public Response killSegments(ClientKillQuery killQuery)
{
master.killSegments(killQuery);
return Response.ok().build();
}
@GET
@Path("/loadstatus")
@Produces("application/json")

View File

@ -0,0 +1,122 @@
package com.metamx.druid.loading;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import com.google.inject.Inject;
import com.metamx.common.ISE;
import com.metamx.common.MapUtils;
import com.metamx.common.logger.Logger;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.db.DbConnectorConfig;
import org.codehaus.jackson.map.ObjectMapper;
import org.jets3t.service.ServiceException;
import org.jets3t.service.impl.rest.httpclient.RestS3Service;
import org.joda.time.Interval;
import org.skife.jdbi.v2.DBI;
import org.skife.jdbi.v2.FoldController;
import org.skife.jdbi.v2.Folder3;
import org.skife.jdbi.v2.Handle;
import org.skife.jdbi.v2.StatementContext;
import org.skife.jdbi.v2.tweak.HandleCallback;
import java.sql.SQLException;
import java.util.List;
import java.util.Map;
/**
*/
public class S3SegmentKiller implements SegmentKiller
{
private static final Logger log = new Logger(S3SegmentKiller.class);
private final RestS3Service s3Client;
private final DBI dbi;
private final DbConnectorConfig config;
private final ObjectMapper jsonMapper;
@Inject
public S3SegmentKiller(
RestS3Service s3Client,
DBI dbi,
DbConnectorConfig config,
ObjectMapper jsonMapper
)
{
this.s3Client = s3Client;
this.dbi = dbi;
this.config = config;
this.jsonMapper = jsonMapper;
}
@Override
public List<DataSegment> kill(final String datasource, final Interval interval) throws ServiceException
{
List<DataSegment> matchingSegments = dbi.withHandle(
new HandleCallback<List<DataSegment>>()
{
@Override
public List<DataSegment> withHandle(Handle handle) throws Exception
{
return handle.createQuery(
String.format(
"SELECT payload FROM %s WHERE dataSource = :dataSource and start >= :start and end <= :end and used = 0",
config.getSegmentTable()
)
)
.bind("dataSource", datasource)
.bind("start", interval.getStart().toString())
.bind("end", interval.getEnd().toString())
.fold(
Lists.<DataSegment>newArrayList(),
new Folder3<List<DataSegment>, Map<String, Object>>()
{
@Override
public List<DataSegment> fold(
List<DataSegment> accumulator,
Map<String, Object> stringObjectMap,
FoldController foldController,
StatementContext statementContext
) throws SQLException
{
try {
DataSegment segment = jsonMapper.readValue(
(String) stringObjectMap.get("payload"),
DataSegment.class
);
accumulator.add(segment);
return accumulator;
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
}
);
}
}
);
log.info("Found %,d segments for %s for interval %s.", matchingSegments.size(), datasource, interval);
for (final DataSegment segment : matchingSegments) {
// Remove from S3
Map<String, Object> loadSpec = segment.getLoadSpec();
String s3Bucket = MapUtils.getString(loadSpec, "bucket");
String s3Path = MapUtils.getString(loadSpec, "key");
String s3DescriptorPath = s3Path.substring(0, s3Path.lastIndexOf("/")) + "/descriptor.json";
if (s3Client.isObjectInBucket(s3Bucket, s3Path)) {
log.info("Removing index file[s3://%s/%s] from s3!", s3Bucket, s3Path);
s3Client.deleteObject(s3Bucket, s3Path);
}
if (s3Client.isObjectInBucket(s3Bucket, s3DescriptorPath)) {
log.info("Removing descriptor file[s3://%s/%s] from s3!", s3Bucket, s3DescriptorPath);
s3Client.deleteObject(s3Bucket, s3DescriptorPath);
}
}
return matchingSegments;
}
}

View File

@ -0,0 +1,14 @@
package com.metamx.druid.loading;
import com.metamx.druid.client.DataSegment;
import org.jets3t.service.ServiceException;
import org.joda.time.Interval;
import java.util.List;
/**
*/
public interface SegmentKiller
{
public List<DataSegment> kill(String datasource, Interval interval) throws ServiceException;
}

View File

@ -21,7 +21,7 @@ package com.metamx.druid.master;
import com.google.common.base.Function;
import com.google.common.base.Predicate;
import com.google.common.collect.Collections2;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
@ -36,7 +36,6 @@ import com.metamx.common.guava.Comparators;
import com.metamx.common.guava.FunctionalIterable;
import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.common.logger.Logger;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.client.DruidDataSource;
import com.metamx.druid.client.DruidServer;
@ -44,9 +43,12 @@ import com.metamx.druid.client.ServerInventoryManager;
import com.metamx.druid.coordination.DruidClusterInfo;
import com.metamx.druid.db.DatabaseRuleManager;
import com.metamx.druid.db.DatabaseSegmentManager;
import com.metamx.druid.merge.ClientKillQuery;
import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.emitter.service.ServiceMetricEvent;
import com.metamx.http.client.HttpClient;
import com.metamx.http.client.response.HttpResponseHandler;
import com.metamx.phonebook.PhoneBook;
import com.metamx.phonebook.PhoneBookPeon;
import com.netflix.curator.x.discovery.ServiceProvider;
@ -55,8 +57,8 @@ import org.codehaus.jackson.map.ObjectMapper;
import org.joda.time.Duration;
import javax.annotation.Nullable;
import java.net.URL;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
@ -89,6 +91,8 @@ public class DruidMaster
private final PhoneBookPeon masterPeon;
private final Map<String, LoadQueuePeon> loadManagementPeons;
private final ServiceProvider serviceProvider;
private final HttpClient httpClient;
private final HttpResponseHandler<StringBuilder, String> responseHandler;
private final ObjectMapper jsonMapper;
@ -103,7 +107,9 @@ public class DruidMaster
ServiceEmitter emitter,
ScheduledExecutorFactory scheduledExecutorFactory,
Map<String, LoadQueuePeon> loadManagementPeons,
ServiceProvider serviceProvider
ServiceProvider serviceProvider,
HttpClient httpClient,
HttpResponseHandler<StringBuilder, String> responseHandler
)
{
this.config = config;
@ -124,6 +130,9 @@ public class DruidMaster
this.loadManagementPeons = loadManagementPeons;
this.serviceProvider = serviceProvider;
this.httpClient = httpClient;
this.responseHandler = responseHandler;
}
public boolean isClusterMaster()
@ -199,6 +208,27 @@ public class DruidMaster
databaseSegmentManager.enableDatasource(ds);
}
public void killSegments(ClientKillQuery killQuery)
{
try {
httpClient.post(
new URL(
String.format(
"http://%s:%s/mmx/merger/v1/index",
serviceProvider.getInstance().getAddress(),
serviceProvider.getInstance().getPort()
)
)
)
.setContent("application/json", jsonMapper.writeValueAsBytes(killQuery))
.go(responseHandler)
.get();
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
public void moveSegment(String from, String to, String segmentName, final LoadPeonCallback callback)
{
final DruidServer fromServer = serverInventoryManager.getInventoryValue(from);
@ -564,7 +594,6 @@ public class DruidMaster
DruidMasterRuntimeParams.newBuilder()
.withStartTime(startTime)
.withDatasources(databaseSegmentManager.getInventory())
.withLoadManagementPeons(loadManagementPeons)
.withMillisToWaitBeforeDeleting(config.getMillisToWaitBeforeDeleting())
.withEmitter(emitter)
.withMergeBytesLimit(config.getMergeBytesLimit())
@ -666,6 +695,7 @@ public class DruidMaster
decrementRemovedSegmentsLifetime();
return params.buildFromExisting()
.withLoadManagementPeons(loadManagementPeons)
.withDruidCluster(cluster)
.withDatabaseRuleManager(databaseRuleManager)
.withSegmentReplicantLookup(segmentReplicantLookup)
@ -688,7 +718,14 @@ public class DruidMaster
super(
ImmutableList.of(
new DruidMasterSegmentInfoLoader(DruidMaster.this),
new DruidMasterSegmentMerger(jsonMapper, serviceProvider),
new DruidMasterSegmentMerger(
new HttpMergerClient(
httpClient,
responseHandler,
jsonMapper,
serviceProvider
)
),
new DruidMasterHelper()
{
@Override

View File

@ -0,0 +1,3 @@
#select_datasource {
margin: 20px 0 20px 0;
}

View File

@ -0,0 +1,75 @@
<!DOCTYPE HTML>
<!--
~ Druid - a distributed column store.
~ Copyright (C) 2012 Metamarkets Group Inc.
~
~ This program is free software; you can redistribute it and/or
~ modify it under the terms of the GNU General Public License
~ as published by the Free Software Foundation; either version 2
~ of the License, or (at your option) any later version.
~
~ This program is distributed in the hope that it will be useful,
~ but WITHOUT ANY WARRANTY; without even the implied warranty of
~ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
~ GNU General Public License for more details.
~
~ You should have received a copy of the GNU General Public License
~ along with this program; if not, write to the Free Software
~ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
-->
<html xmlns="http://www.w3.org/1999/html">
<head>
<title>Druid Master Console - Enable/Disable Datasources</title>
<link rel="shortcut icon" type="image/ico" href="images/favicon.ico">
<meta http-equiv="Content-Type" content="text/html; charset=UTF-8"/>
<meta name="Description" content="Druid Master Console Page"/>
<style type="text/css">@import "css/style.css";</style>
<style type="text/css">@import "css/jquery-ui-1.9.2.css";</style>
<style type="text/css">@import "css/enable.css";</style>
<script type="text/javascript" src="js/underscore-1.2.2.js"></script>
<script type="text/javascript" src="js/jquery-1.8.3.js"></script>
<script type="text/javascript" src="js/jquery-ui-1.9.2.js"></script>
<script type="text/javascript" src="js/enable-0.0.1.js"></script>
</head>
<body>
<div class="container">
<div class="heading">Enable/Disable Datasources</div>
<div>
<h4>Enabled Datasources:</h4>
<ul id="enabled_datasources">
</ul>
</div>
<div>
<h4>Disabled Datasources:</h4>
<ul id="disabled_datasources">
</ul>
</div>
<div id="select_datasource">
Select Data Source:
<select id="datasources">
<option></option>
</select>
</div>
<div>
<button type="button" id="enable">Enable</button>
<div id="enable_dialog" title="Confirm Enable">
<p>Are you sure you want to enable the selected datasource?</p>
</div>
<button type="button" id="disable">Disable</button>
<div id="disable_dialog" title="Confirm Cancel">
<p>Are you sure you want to disable the selected datasource?</p>
</div>
<div id="error_dialog" title="Error!"></div>
</div>
</div>
</body>
</html>

View File

@ -20,20 +20,29 @@
<html>
<head>
<title>Druid Master Console</title>
<link rel="shortcut icon" type="image/ico" href="images/favicon.ico">
<meta http-equiv="Content-Type" content="text/html; charset=UTF-8"/>
<meta name="Description" content="Druid Master Console Page"/>
<title>Druid Master Console</title>
<link rel="shortcut icon" type="image/ico" href="images/favicon.ico">
<meta http-equiv="Content-Type" content="text/html; charset=UTF-8"/>
<meta name="Description" content="Druid Master Console Page"/>
<style type="text/css">@import "css/style.css";</style>
<style type="text/css">@import "css/style.css";</style>
</head>
<body>
<div class="container">
<a href="view.html">View Information about the Cluster</a>
<br/>
<a href="rules.html">Configure Compute Node Rules</a>
<div>
<a href="view.html">View Information about the Cluster</a>
</div>
<div>
<a href="rules.html">Configure Compute Node Rules</a>
</div>
<div>
<a href="enable.html">Enable/Disable Datasources</a>
</div>
<div>
<a href="kill.html">Permanent Segment Deletion</a>
</div>
</div>
</body>
</html>

View File

@ -0,0 +1,97 @@
$(document).ready(function() {
$("button").button();
$("#error_dialog").dialog({
autoOpen: false,
modal:true,
resizeable: false,
buttons: {
Ok : function() {
$(this).dialog("close");
}
}
});
$("#enable_dialog").dialog({
autoOpen: false,
modal:true,
resizeable: false,
buttons: {
Yes : function() {
var selected = $('#datasources option:selected').text();
$.ajax({
type: 'POST',
url:'/info/datasources/' + selected,
data: JSON.stringify(selected),
contentType:"application/json; charset=utf-8",
dataType:"json",
error: function(xhr, status, error) {
$("#enable_dialog").dialog("close");
$("#error_dialog").html(xhr.responseText);
$("#error_dialog").dialog("open");
},
success: function(data, status, xhr) {
$("#enable_dialog").dialog("close");
}
});
},
Cancel: function() {
$(this).dialog("close");
}
}
});
$("#disable_dialog").dialog({
autoOpen: false,
modal:true,
resizeable: false,
buttons: {
Yes : function() {
var selected = $('#datasources option:selected').text();
$.ajax({
type: 'DELETE',
url:'/info/datasources/' + selected,
data: JSON.stringify(selected),
contentType:"application/json; charset=utf-8",
dataType:"json",
error: function(xhr, status, error) {
$("#disable_dialog").dialog("close");
$("#error_dialog").html(xhr.responseText);
$("#error_dialog").dialog("open");
},
success: function(data, status, xhr) {
$("#disable_dialog").dialog("close");
}
});
},
Cancel: function() {
$(this).dialog("close");
}
}
});
$.getJSON("/info/db/datasources", function(enabled_datasources) {
$.each(enabled_datasources, function(index, datasource) {
$('#enabled_datasources').append($('<li>' + datasource + '</li>'));
});
$.getJSON("/info/db/datasources?includeDisabled", function(db_datasources) {
var disabled_datasources = _.difference(db_datasources, enabled_datasources);
$.each(disabled_datasources, function(index, datasource) {
$('#disabled_datasources').append($('<li>' + datasource + '</li>'));
});
$.each(db_datasources, function(index, datasource) {
$('#datasources').append($('<option></option>').attr("value", datasource).text(datasource));
});
});
});
$("#enable").click(function() {
$("#enable_dialog").dialog("open");
});
$('#disable').click(function (){
$("#disable_dialog").dialog("open")
});
});

View File

@ -0,0 +1,58 @@
$(document).ready(function() {
$("button").button();
$("#error_dialog").dialog({
autoOpen: false,
modal:true,
resizeable: false,
buttons: {
Ok : function() {
$(this).dialog("close");
}
}
});
$("#confirm_dialog").dialog({
autoOpen: false,
modal:true,
resizeable: false,
buttons: {
Yes : function() {
var selected = $('#datasources option:selected').text();
var interval = $('#interval').val();
var toSend = {
"dataSource" : selected,
"interval" : interval
}
$.ajax({
type: 'POST',
url:'/master/kill',
data: JSON.stringify(toSend),
contentType:"application/json; charset=utf-8",
dataType:"json",
error: function(xhr, status, error) {
$("#confirm_dialog").dialog("close");
$("#error_dialog").html(xhr.responseText);
$("#error_dialog").dialog("open");
},
success: function(data, status, xhr) {
$("#confirm_dialog").dialog("close");
}
});
},
Cancel: function() {
$(this).dialog("close");
}
}
});
$.getJSON("/info/db/datasources?includeDisabled", function(data) {
$.each(data, function(index, datasource) {
$('#datasources').append($('<option></option>').attr("value", datasource).text(datasource));
});
});
$("#confirm").click(function() {
$("#confirm_dialog").dialog("open");
});
});

View File

@ -0,0 +1,61 @@
<!DOCTYPE HTML>
<!--
~ Druid - a distributed column store.
~ Copyright (C) 2012 Metamarkets Group Inc.
~
~ This program is free software; you can redistribute it and/or
~ modify it under the terms of the GNU General Public License
~ as published by the Free Software Foundation; either version 2
~ of the License, or (at your option) any later version.
~
~ This program is distributed in the hope that it will be useful,
~ but WITHOUT ANY WARRANTY; without even the implied warranty of
~ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
~ GNU General Public License for more details.
~
~ You should have received a copy of the GNU General Public License
~ along with this program; if not, write to the Free Software
~ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
-->
<html xmlns="http://www.w3.org/1999/html">
<head>
<title>Druid Master Console - Enable/Disable Datasources</title>
<link rel="shortcut icon" type="image/ico" href="images/favicon.ico">
<meta http-equiv="Content-Type" content="text/html; charset=UTF-8"/>
<meta name="Description" content="Druid Master Console Page"/>
<style type="text/css">@import "css/style.css";</style>
<style type="text/css">@import "css/jquery-ui-1.9.2.css";</style>
<script type="text/javascript" src="js/underscore-1.2.2.js"></script>
<script type="text/javascript" src="js/jquery-1.8.3.js"></script>
<script type="text/javascript" src="js/jquery-ui-1.9.2.js"></script>
<script type="text/javascript" src="js/kill-0.0.1.js"></script>
</head>
<body>
<div class="container">
<div class="heading">Permanently Delete Segments</div>
<div id="select_datasource">
Select Data Source:
<select id="datasources">
<option></option>
</select>
</div>
<p>Interval:</p>
<span><input type="text" name="interval" id="interval"/></span>
<div>
<button type="button" id="confirm">Confirm</button>
<div id="confirm_dialog" title="Confirm">
<p>Are you sure you want delete segments for this datasource and range? There is no going back!</p>
</div>
<div id="error_dialog" title="Error!"></div>
</div>
</div>
</body>
</html>

View File

@ -134,6 +134,8 @@ public class DruidMasterTest
new NoopServiceEmitter(),
scheduledExecutorFactory,
loadManagementPeons,
null,
null,
null
);
}