Merge branch 'forking-task-runner' into stop_task

Conflicts:
	merger/src/main/java/com/metamx/druid/merger/common/task/AbstractTask.java
	merger/src/main/java/com/metamx/druid/merger/common/task/Task.java
	merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorResource.java
This commit is contained in:
Fangjin Yang 2013-03-19 12:23:20 -07:00
commit d53822130b
50 changed files with 1479 additions and 629 deletions

View File

@ -28,7 +28,7 @@
<parent> <parent>
<groupId>com.metamx</groupId> <groupId>com.metamx</groupId>
<artifactId>druid</artifactId> <artifactId>druid</artifactId>
<version>0.3.24-SNAPSHOT</version> <version>0.3.26-SNAPSHOT</version>
</parent> </parent>
<dependencies> <dependencies>

View File

@ -28,7 +28,7 @@
<parent> <parent>
<groupId>com.metamx</groupId> <groupId>com.metamx</groupId>
<artifactId>druid</artifactId> <artifactId>druid</artifactId>
<version>0.3.24-SNAPSHOT</version> <version>0.3.26-SNAPSHOT</version>
</parent> </parent>
<dependencies> <dependencies>

View File

@ -24,11 +24,11 @@
<artifactId>druid-services</artifactId> <artifactId>druid-services</artifactId>
<name>druid-services</name> <name>druid-services</name>
<description>druid-services</description> <description>druid-services</description>
<version>0.3.24-SNAPSHOT</version> <version>0.3.26-SNAPSHOT</version>
<parent> <parent>
<groupId>com.metamx</groupId> <groupId>com.metamx</groupId>
<artifactId>druid</artifactId> <artifactId>druid</artifactId>
<version>0.3.24-SNAPSHOT</version> <version>0.3.26-SNAPSHOT</version>
</parent> </parent>
<dependencies> <dependencies>

View File

@ -28,7 +28,7 @@
<parent> <parent>
<groupId>com.metamx</groupId> <groupId>com.metamx</groupId>
<artifactId>druid</artifactId> <artifactId>druid</artifactId>
<version>0.3.24-SNAPSHOT</version> <version>0.3.26-SNAPSHOT</version>
</parent> </parent>
<modules> <modules>

View File

@ -9,7 +9,7 @@
<parent> <parent>
<groupId>com.metamx</groupId> <groupId>com.metamx</groupId>
<artifactId>druid-examples</artifactId> <artifactId>druid-examples</artifactId>
<version>0.3.24-SNAPSHOT</version> <version>0.3.26-SNAPSHOT</version>
</parent> </parent>
<dependencies> <dependencies>

View File

@ -9,7 +9,7 @@
<parent> <parent>
<groupId>com.metamx</groupId> <groupId>com.metamx</groupId>
<artifactId>druid-examples</artifactId> <artifactId>druid-examples</artifactId>
<version>0.3.24-SNAPSHOT</version> <version>0.3.26-SNAPSHOT</version>
</parent> </parent>
<dependencies> <dependencies>

View File

@ -28,7 +28,7 @@
<parent> <parent>
<groupId>com.metamx</groupId> <groupId>com.metamx</groupId>
<artifactId>druid</artifactId> <artifactId>druid</artifactId>
<version>0.3.24-SNAPSHOT</version> <version>0.3.26-SNAPSHOT</version>
</parent> </parent>
<dependencies> <dependencies>

View File

@ -28,7 +28,7 @@
<parent> <parent>
<groupId>com.metamx</groupId> <groupId>com.metamx</groupId>
<artifactId>druid</artifactId> <artifactId>druid</artifactId>
<version>0.3.24-SNAPSHOT</version> <version>0.3.26-SNAPSHOT</version>
</parent> </parent>
<dependencies> <dependencies>

View File

@ -28,7 +28,7 @@
<parent> <parent>
<groupId>com.metamx</groupId> <groupId>com.metamx</groupId>
<artifactId>druid</artifactId> <artifactId>druid</artifactId>
<version>0.3.24-SNAPSHOT</version> <version>0.3.26-SNAPSHOT</version>
</parent> </parent>
<dependencies> <dependencies>

View File

@ -1,25 +0,0 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.merger.common;
public interface TaskCallback
{
public void notify(TaskStatus status);
}

View File

@ -28,15 +28,17 @@ public class LocalTaskActionClient implements TaskActionClient
final RetType ret = taskAction.perform(task, toolbox); final RetType ret = taskAction.perform(task, toolbox);
// Add audit log if (taskAction.isAudited()) {
try { // Add audit log
storage.addAuditLog(task, taskAction); try {
} storage.addAuditLog(task, taskAction);
catch (Exception e) { }
log.makeAlert(e, "Failed to record action in audit log") catch (Exception e) {
.addData("task", task.getId()) log.makeAlert(e, "Failed to record action in audit log")
.addData("actionClass", taskAction.getClass().getName()) .addData("task", task.getId())
.emit(); .addData("actionClass", taskAction.getClass().getName())
.emit();
}
} }
return ret; return ret;

View File

@ -1,6 +1,7 @@
package com.metamx.druid.merger.common.actions; package com.metamx.druid.merger.common.actions;
import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.base.Throwables; import com.google.common.base.Throwables;
@ -10,6 +11,7 @@ import org.joda.time.Interval;
public class LockAcquireAction implements TaskAction<TaskLock> public class LockAcquireAction implements TaskAction<TaskLock>
{ {
@JsonIgnore
private final Interval interval; private final Interval interval;
@JsonCreator @JsonCreator
@ -44,6 +46,12 @@ public class LockAcquireAction implements TaskAction<TaskLock>
} }
} }
@Override
public boolean isAudited()
{
return true;
}
@Override @Override
public String toString() public String toString()
{ {

View File

@ -1,12 +1,8 @@
package com.metamx.druid.merger.common.actions; package com.metamx.druid.merger.common.actions;
import com.google.common.base.Optional; import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.base.Throwables;
import com.metamx.druid.merger.common.TaskLock; import com.metamx.druid.merger.common.TaskLock;
import com.metamx.druid.merger.common.task.Task; import com.metamx.druid.merger.common.task.Task;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.type.TypeReference;
import java.util.List; import java.util.List;
@ -23,6 +19,12 @@ public class LockListAction implements TaskAction<List<TaskLock>>
return toolbox.getTaskLockbox().findLocksForTask(task); return toolbox.getTaskLockbox().findLocksForTask(task);
} }
@Override
public boolean isAudited()
{
return false;
}
@Override @Override
public String toString() public String toString()
{ {

View File

@ -1,6 +1,7 @@
package com.metamx.druid.merger.common.actions; package com.metamx.druid.merger.common.actions;
import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.core.type.TypeReference;
import com.metamx.druid.merger.common.task.Task; import com.metamx.druid.merger.common.task.Task;
@ -8,6 +9,7 @@ import org.joda.time.Interval;
public class LockReleaseAction implements TaskAction<Void> public class LockReleaseAction implements TaskAction<Void>
{ {
@JsonIgnore
private final Interval interval; private final Interval interval;
@JsonCreator @JsonCreator
@ -36,6 +38,12 @@ public class LockReleaseAction implements TaskAction<Void>
return null; return null;
} }
@Override
public boolean isAudited()
{
return true;
}
@Override @Override
public String toString() public String toString()
{ {

View File

@ -80,6 +80,12 @@ public class SegmentInsertAction implements TaskAction<Set<DataSegment>>
return retVal; return retVal;
} }
@Override
public boolean isAudited()
{
return true;
}
@Override @Override
public String toString() public String toString()
{ {

View File

@ -1,6 +1,7 @@
package com.metamx.druid.merger.common.actions; package com.metamx.druid.merger.common.actions;
import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.core.type.TypeReference;
import com.metamx.druid.client.DataSegment; import com.metamx.druid.client.DataSegment;
@ -12,7 +13,10 @@ import java.util.List;
public class SegmentListUnusedAction implements TaskAction<List<DataSegment>> public class SegmentListUnusedAction implements TaskAction<List<DataSegment>>
{ {
@JsonIgnore
private final String dataSource; private final String dataSource;
@JsonIgnore
private final Interval interval; private final Interval interval;
@JsonCreator @JsonCreator
@ -48,6 +52,12 @@ public class SegmentListUnusedAction implements TaskAction<List<DataSegment>>
return toolbox.getMergerDBCoordinator().getUnusedSegmentsForInterval(dataSource, interval); return toolbox.getMergerDBCoordinator().getUnusedSegmentsForInterval(dataSource, interval);
} }
@Override
public boolean isAudited()
{
return false;
}
@Override @Override
public String toString() public String toString()
{ {

View File

@ -1,6 +1,7 @@
package com.metamx.druid.merger.common.actions; package com.metamx.druid.merger.common.actions;
import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.core.type.TypeReference;
import com.metamx.druid.client.DataSegment; import com.metamx.druid.client.DataSegment;
@ -12,7 +13,10 @@ import java.util.List;
public class SegmentListUsedAction implements TaskAction<List<DataSegment>> public class SegmentListUsedAction implements TaskAction<List<DataSegment>>
{ {
@JsonIgnore
private final String dataSource; private final String dataSource;
@JsonIgnore
private final Interval interval; private final Interval interval;
@JsonCreator @JsonCreator
@ -48,6 +52,12 @@ public class SegmentListUsedAction implements TaskAction<List<DataSegment>>
return toolbox.getMergerDBCoordinator().getUsedSegmentsForInterval(dataSource, interval); return toolbox.getMergerDBCoordinator().getUsedSegmentsForInterval(dataSource, interval);
} }
@Override
public boolean isAudited()
{
return false;
}
@Override @Override
public String toString() public String toString()
{ {

View File

@ -1,6 +1,7 @@
package com.metamx.druid.merger.common.actions; package com.metamx.druid.merger.common.actions;
import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableSet;
@ -14,6 +15,7 @@ import java.util.Set;
public class SegmentNukeAction implements TaskAction<Void> public class SegmentNukeAction implements TaskAction<Void>
{ {
@JsonIgnore
private final Set<DataSegment> segments; private final Set<DataSegment> segments;
@JsonCreator @JsonCreator
@ -57,6 +59,12 @@ public class SegmentNukeAction implements TaskAction<Void>
return null; return null;
} }
@Override
public boolean isAudited()
{
return true;
}
@Override @Override
public String toString() public String toString()
{ {

View File

@ -1,6 +1,7 @@
package com.metamx.druid.merger.common.actions; package com.metamx.druid.merger.common.actions;
import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
@ -10,6 +11,7 @@ import java.util.List;
public class SpawnTasksAction implements TaskAction<Void> public class SpawnTasksAction implements TaskAction<Void>
{ {
@JsonIgnore
private final List<Task> newTasks; private final List<Task> newTasks;
@JsonCreator @JsonCreator
@ -41,6 +43,12 @@ public class SpawnTasksAction implements TaskAction<Void>
return null; return null;
} }
@Override
public boolean isAudited()
{
return true;
}
@Override @Override
public String toString() public String toString()
{ {

View File

@ -22,4 +22,5 @@ public interface TaskAction<RetType>
{ {
public TypeReference<RetType> getReturnTypeReference(); // T_T public TypeReference<RetType> getReturnTypeReference(); // T_T
public RetType perform(Task task, TaskActionToolbox toolbox) throws IOException; public RetType perform(Task task, TaskActionToolbox toolbox) throws IOException;
public boolean isAudited();
} }

View File

@ -27,8 +27,8 @@ import com.google.common.base.Optional;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.metamx.druid.Query; import com.metamx.druid.Query;
import com.metamx.druid.merger.common.TaskStatus; import com.metamx.druid.merger.common.TaskStatus;
import com.metamx.druid.merger.common.TaskToolbox;
import com.metamx.druid.merger.common.actions.SegmentListUsedAction; import com.metamx.druid.merger.common.actions.SegmentListUsedAction;
import com.metamx.druid.merger.common.actions.TaskActionClient;
import com.metamx.druid.query.QueryRunner; import com.metamx.druid.query.QueryRunner;
import org.joda.time.Interval; import org.joda.time.Interval;
@ -96,7 +96,7 @@ public abstract class AbstractTask implements Task
} }
@Override @Override
public TaskStatus preflight(TaskToolbox toolbox) throws Exception public TaskStatus preflight(TaskActionClient taskActionClient) throws Exception
{ {
return TaskStatus.running(id); return TaskStatus.running(id);
} }
@ -104,6 +104,7 @@ public abstract class AbstractTask implements Task
@Override @Override
public void shutdown() public void shutdown()
{ {
// Do nothing.
} }
@Override @Override

View File

@ -219,7 +219,7 @@ public class IndexGeneratorTask extends AbstractTask
return schema.getShardSpec().isInChunk(eventDimensions); return schema.getShardSpec().isInChunk(eventDimensions);
} }
@JsonProperty @JsonProperty("firehose")
public FirehoseFactory getFirehoseFactory() public FirehoseFactory getFirehoseFactory()
{ {
return firehoseFactory; return firehoseFactory;

View File

@ -31,6 +31,7 @@ import com.metamx.druid.indexer.granularity.GranularitySpec;
import com.metamx.druid.merger.common.TaskStatus; import com.metamx.druid.merger.common.TaskStatus;
import com.metamx.druid.merger.common.TaskToolbox; import com.metamx.druid.merger.common.TaskToolbox;
import com.metamx.druid.merger.common.actions.SpawnTasksAction; import com.metamx.druid.merger.common.actions.SpawnTasksAction;
import com.metamx.druid.merger.common.actions.TaskActionClient;
import com.metamx.druid.realtime.FirehoseFactory; import com.metamx.druid.realtime.FirehoseFactory;
import com.metamx.druid.realtime.Schema; import com.metamx.druid.realtime.Schema;
import com.metamx.druid.shard.NoneShardSpec; import com.metamx.druid.shard.NoneShardSpec;
@ -144,9 +145,9 @@ public class IndexTask extends AbstractTask
} }
@Override @Override
public TaskStatus preflight(TaskToolbox toolbox) throws Exception public TaskStatus preflight(TaskActionClient taskActionClient) throws Exception
{ {
toolbox.getTaskActionClient().submit(new SpawnTasksAction(toSubtasks())); taskActionClient.submit(new SpawnTasksAction(toSubtasks()));
return TaskStatus.success(getId()); return TaskStatus.success(getId());
} }

View File

@ -36,11 +36,13 @@ import com.google.common.collect.Sets;
import com.google.common.hash.Hashing; import com.google.common.hash.Hashing;
import com.metamx.common.ISE; import com.metamx.common.ISE;
import com.metamx.druid.client.DataSegment; import com.metamx.druid.client.DataSegment;
import com.metamx.druid.index.v1.IndexIO;
import com.metamx.druid.merger.common.TaskLock; import com.metamx.druid.merger.common.TaskLock;
import com.metamx.druid.merger.common.TaskStatus; import com.metamx.druid.merger.common.TaskStatus;
import com.metamx.druid.merger.common.TaskToolbox; import com.metamx.druid.merger.common.TaskToolbox;
import com.metamx.druid.merger.common.actions.LockListAction; import com.metamx.druid.merger.common.actions.LockListAction;
import com.metamx.druid.merger.common.actions.SegmentInsertAction; import com.metamx.druid.merger.common.actions.SegmentInsertAction;
import com.metamx.druid.merger.common.actions.TaskActionClient;
import com.metamx.druid.shard.NoneShardSpec; import com.metamx.druid.shard.NoneShardSpec;
import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.service.ServiceEmitter; import com.metamx.emitter.service.ServiceEmitter;
@ -185,7 +187,7 @@ public abstract class MergeTaskBase extends AbstractTask
* we are operating on every segment that overlaps the chosen interval. * we are operating on every segment that overlaps the chosen interval.
*/ */
@Override @Override
public TaskStatus preflight(TaskToolbox toolbox) public TaskStatus preflight(TaskActionClient taskActionClient)
{ {
try { try {
final Function<DataSegment, String> toIdentifier = new Function<DataSegment, String>() final Function<DataSegment, String> toIdentifier = new Function<DataSegment, String>()
@ -198,7 +200,7 @@ public abstract class MergeTaskBase extends AbstractTask
}; };
final Set<String> current = ImmutableSet.copyOf( final Set<String> current = ImmutableSet.copyOf(
Iterables.transform(toolbox.getTaskActionClient().submit(defaultListUsedAction()), toIdentifier) Iterables.transform(taskActionClient.submit(defaultListUsedAction()), toIdentifier)
); );
final Set<String> requested = ImmutableSet.copyOf(Iterables.transform(segments, toIdentifier)); final Set<String> requested = ImmutableSet.copyOf(Iterables.transform(segments, toIdentifier));
@ -312,6 +314,7 @@ public abstract class MergeTaskBase extends AbstractTask
.dataSource(dataSource) .dataSource(dataSource)
.interval(mergedInterval) .interval(mergedInterval)
.version(version) .version(version)
.binaryVersion(IndexIO.CURRENT_VERSION_ID)
.shardSpec(new NoneShardSpec()) .shardSpec(new NoneShardSpec())
.dimensions(Lists.newArrayList(mergedDimensions)) .dimensions(Lists.newArrayList(mergedDimensions))
.metrics(Lists.newArrayList(mergedMetrics)) .metrics(Lists.newArrayList(mergedMetrics))

View File

@ -25,6 +25,7 @@ import com.google.common.base.Optional;
import com.metamx.druid.Query; import com.metamx.druid.Query;
import com.metamx.druid.merger.common.TaskStatus; import com.metamx.druid.merger.common.TaskStatus;
import com.metamx.druid.merger.common.TaskToolbox; import com.metamx.druid.merger.common.TaskToolbox;
import com.metamx.druid.merger.common.actions.TaskActionClient;
import com.metamx.druid.query.QueryRunner; import com.metamx.druid.query.QueryRunner;
import org.joda.time.Interval; import org.joda.time.Interval;
@ -37,8 +38,8 @@ import org.joda.time.Interval;
* <li>Tasks are each part of a "task group", which is a set of tasks that can share interval locks. These are * <li>Tasks are each part of a "task group", which is a set of tasks that can share interval locks. These are
* useful for producing sharded segments.</li> * useful for producing sharded segments.</li>
* <li>Tasks can optionally have an "implicit lock interval". Tasks with this property are guaranteed to have * <li>Tasks can optionally have an "implicit lock interval". Tasks with this property are guaranteed to have
* a lock on that interval during their {@link #preflight(com.metamx.druid.merger.common.TaskToolbox)} and * a lock on that interval during their {@link #preflight(com.metamx.druid.merger.common.actions.TaskActionClient)}
* {@link #run(com.metamx.druid.merger.common.TaskToolbox)} methods.</li> * and {@link #run(com.metamx.druid.merger.common.TaskToolbox)} methods.</li>
* <li>Tasks do not need to explicitly release locks; they are released upon task completion. Tasks may choose * <li>Tasks do not need to explicitly release locks; they are released upon task completion. Tasks may choose
* to release locks early if they desire.</li> * to release locks early if they desire.</li>
* </ul> * </ul>
@ -97,14 +98,14 @@ public interface Task
* holding a lock on our dataSource and implicit lock interval (if any). If this method throws an exception, the * holding a lock on our dataSource and implicit lock interval (if any). If this method throws an exception, the
* task should be considered a failure. * task should be considered a failure.
* *
* @param toolbox Toolbox for this task * @param taskActionClient action client for this task (not the full toolbox)
* *
* @return Some kind of status (runnable means continue on to a worker, non-runnable means we completed without * @return Some kind of status (runnable means continue on to a worker, non-runnable means we completed without
* using a worker). * using a worker).
* *
* @throws Exception * @throws Exception
*/ */
public TaskStatus preflight(TaskToolbox toolbox) throws Exception; public TaskStatus preflight(TaskActionClient taskActionClient) throws Exception;
/** /**
* Execute a task. This typically runs on a worker as determined by a TaskRunner, and will be run while * Execute a task. This typically runs on a worker as determined by a TaskRunner, and will be run while
@ -119,5 +120,9 @@ public interface Task
*/ */
public TaskStatus run(TaskToolbox toolbox) throws Exception; public TaskStatus run(TaskToolbox toolbox) throws Exception;
/**
* Best-effort task cancellation. May or may not do anything. Calling this multiple times may have
* a stronger effect.
*/
public void shutdown(); public void shutdown();
} }

View File

@ -135,15 +135,13 @@ public class VersionConverterTask extends AbstractTask
} }
@Override @Override
public TaskStatus preflight(TaskToolbox toolbox) throws Exception public TaskStatus preflight(TaskActionClient taskActionClient) throws Exception
{ {
if (segment != null) { if (segment != null) {
return super.preflight(toolbox); return super.preflight(taskActionClient);
} }
final TaskActionClient taskClient = toolbox.getTaskActionClient(); List<DataSegment> segments = taskActionClient.submit(defaultListUsedAction());
List<DataSegment> segments = taskClient.submit(defaultListUsedAction());
final FunctionalIterable<Task> tasks = FunctionalIterable final FunctionalIterable<Task> tasks = FunctionalIterable
.create(segments) .create(segments)
@ -164,7 +162,7 @@ public class VersionConverterTask extends AbstractTask
} }
); );
taskClient.submit(new SpawnTasksAction(Lists.newArrayList(tasks))); taskActionClient.submit(new SpawnTasksAction(Lists.newArrayList(tasks)));
return TaskStatus.success(getId()); return TaskStatus.success(getId());
} }

View File

@ -21,28 +21,32 @@ package com.metamx.druid.merger.coordinator;
import com.google.common.base.Function; import com.google.common.base.Function;
import com.google.common.base.Throwables; import com.google.common.base.Throwables;
import com.google.common.collect.Collections2; import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Sets; import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.metamx.common.guava.FunctionalIterable; import com.metamx.common.guava.FunctionalIterable;
import com.metamx.common.lifecycle.LifecycleStop; import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.common.logger.Logger; import com.metamx.druid.Query;
import com.metamx.druid.merger.common.RetryPolicy;
import com.metamx.druid.merger.common.TaskCallback;
import com.metamx.druid.merger.common.TaskStatus; import com.metamx.druid.merger.common.TaskStatus;
import com.metamx.druid.merger.common.TaskToolbox; import com.metamx.druid.merger.common.TaskToolbox;
import com.metamx.druid.merger.common.TaskToolboxFactory; import com.metamx.druid.merger.common.TaskToolboxFactory;
import com.metamx.druid.merger.common.task.Task; import com.metamx.druid.merger.common.task.Task;
import com.metamx.druid.query.NoopQueryRunner;
import com.metamx.druid.query.QueryRunner;
import com.metamx.druid.query.segment.QuerySegmentWalker;
import com.metamx.druid.query.segment.SegmentDescriptor;
import com.metamx.emitter.EmittingLogger;
import org.apache.commons.io.FileUtils; import org.apache.commons.io.FileUtils;
import org.joda.time.DateTime; import org.joda.time.DateTime;
import org.mortbay.thread.ThreadPool; import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.io.File; import java.io.File;
import java.util.Collection; import java.util.Collection;
import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
@ -50,36 +54,45 @@ import java.util.concurrent.ThreadPoolExecutor;
/** /**
* Runs tasks in a JVM thread using an ExecutorService. * Runs tasks in a JVM thread using an ExecutorService.
*/ */
public class LocalTaskRunner implements TaskRunner public class ExecutorServiceTaskRunner implements TaskRunner, QuerySegmentWalker
{ {
private final TaskToolboxFactory toolboxFactory; private final TaskToolboxFactory toolboxFactory;
private final ExecutorService exec; private final ListeningExecutorService exec;
private final Set<TaskRunnerWorkItem> runningItems = new ConcurrentSkipListSet<TaskRunnerWorkItem>(); private final Set<TaskRunnerWorkItem> runningItems = new ConcurrentSkipListSet<TaskRunnerWorkItem>();
private static final Logger log = new Logger(LocalTaskRunner.class); private static final EmittingLogger log = new EmittingLogger(ExecutorServiceTaskRunner.class);
public LocalTaskRunner( public ExecutorServiceTaskRunner(
TaskToolboxFactory toolboxFactory, TaskToolboxFactory toolboxFactory,
ExecutorService exec ExecutorService exec
) )
{ {
this.toolboxFactory = toolboxFactory; this.toolboxFactory = toolboxFactory;
this.exec = exec; this.exec = MoreExecutors.listeningDecorator(exec);
} }
@LifecycleStop @LifecycleStop
public void stop() public void stop()
{ {
// TODO is this right
exec.shutdownNow(); exec.shutdownNow();
} }
@Override @Override
public void run(final Task task, final TaskCallback callback) public ListenableFuture<TaskStatus> run(final Task task)
{ {
final TaskToolbox toolbox = toolboxFactory.build(task); final TaskToolbox toolbox = toolboxFactory.build(task);
return exec.submit(new ExecutorServiceTaskRunnerCallable(task, toolbox));
}
exec.submit(new LocalTaskRunnerRunnable(task, toolbox, callback)); @Override
public void shutdown(final String taskid)
{
for (final TaskRunnerWorkItem runningItem : runningItems) {
if (runningItem.getTask().getId().equals(taskid)) {
runningItem.getTask().shutdown();
}
}
} }
@Override @Override
@ -102,8 +115,8 @@ public class LocalTaskRunner implements TaskRunner
@Override @Override
public TaskRunnerWorkItem apply(Runnable input) public TaskRunnerWorkItem apply(Runnable input)
{ {
if (input instanceof LocalTaskRunnerRunnable) { if (input instanceof ExecutorServiceTaskRunnerCallable) {
return ((LocalTaskRunnerRunnable) input).getTaskRunnerWorkItem(); return ((ExecutorServiceTaskRunnerCallable) input).getTaskRunnerWorkItem();
} }
return null; return null;
} }
@ -121,27 +134,72 @@ public class LocalTaskRunner implements TaskRunner
return Lists.newArrayList(); return Lists.newArrayList();
} }
private static class LocalTaskRunnerRunnable implements Runnable @Override
public <T> QueryRunner<T> getQueryRunnerForIntervals(Query<T> query, Iterable<Interval> intervals)
{
return getQueryRunnerImpl(query);
}
@Override
public <T> QueryRunner<T> getQueryRunnerForSegments(Query<T> query, Iterable<SegmentDescriptor> specs)
{
return getQueryRunnerImpl(query);
}
private <T> QueryRunner<T> getQueryRunnerImpl(Query<T> query)
{
QueryRunner<T> queryRunner = null;
final List<Task> runningTasks = Lists.transform(
ImmutableList.copyOf(getRunningTasks()), new Function<TaskRunnerWorkItem, Task>()
{
@Override
public Task apply(TaskRunnerWorkItem o)
{
return o.getTask();
}
}
);
for (final Task task : runningTasks) {
if (task.getDataSource().equals(query.getDataSource())) {
final QueryRunner<T> taskQueryRunner = task.getQueryRunner(query);
if (taskQueryRunner != null) {
if (queryRunner == null) {
queryRunner = taskQueryRunner;
} else {
log.makeAlert("Found too many query runners for datasource")
.addData("dataSource", query.getDataSource())
.emit();
}
}
}
}
return queryRunner == null ? new NoopQueryRunner<T>() : queryRunner;
}
private static class ExecutorServiceTaskRunnerCallable implements Callable<TaskStatus>
{ {
private final Task task; private final Task task;
private final TaskToolbox toolbox; private final TaskToolbox toolbox;
private final TaskCallback callback;
private final DateTime createdTime; private final DateTime createdTime;
public LocalTaskRunnerRunnable(Task task, TaskToolbox toolbox, TaskCallback callback) public ExecutorServiceTaskRunnerCallable(Task task, TaskToolbox toolbox)
{ {
this.task = task; this.task = task;
this.toolbox = toolbox; this.toolbox = toolbox;
this.callback = callback;
this.createdTime = new DateTime(); this.createdTime = new DateTime();
} }
@Override @Override
public void run() public TaskStatus call()
{ {
final long startTime = System.currentTimeMillis(); final long startTime = System.currentTimeMillis();
final File taskDir = toolbox.getTaskDir();
TaskStatus status; TaskStatus status;
@ -163,20 +221,22 @@ public class LocalTaskRunner implements TaskRunner
} }
try { try {
final File taskDir = toolbox.getTaskDir();
if (taskDir.exists()) { if (taskDir.exists()) {
log.info("Removing task directory: %s", taskDir); log.info("Removing task directory: %s", taskDir);
FileUtils.deleteDirectory(taskDir); FileUtils.deleteDirectory(taskDir);
} }
} }
catch (Exception e) { catch (Exception e) {
log.error(e, "Failed to delete task directory: %s", task.getId()); log.makeAlert(e, "Failed to delete task directory")
.addData("taskDir", taskDir.toString())
.addData("task", task.getId())
.emit();
} }
try { try {
callback.notify(status.withDuration(System.currentTimeMillis() - startTime)); return status.withDuration(System.currentTimeMillis() - startTime);
} catch(Exception e) { }
catch (Exception e) {
log.error(e, "Uncaught Exception during callback for task[%s]", task); log.error(e, "Uncaught Exception during callback for task[%s]", task);
throw Throwables.propagate(e); throw Throwables.propagate(e);
} }
@ -186,15 +246,10 @@ public class LocalTaskRunner implements TaskRunner
{ {
return new TaskRunnerWorkItem( return new TaskRunnerWorkItem(
task, task,
callback, null,
null, null,
createdTime createdTime
); );
} }
} }
@Override
public void shutdown(String taskId)
{
}
} }

View File

@ -0,0 +1,293 @@
package com.metamx.druid.merger.coordinator;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.CharMatcher;
import com.google.common.base.Joiner;
import com.google.common.base.Optional;
import com.google.common.base.Predicate;
import com.google.common.base.Splitter;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.io.ByteStreams;
import com.google.common.io.Files;
import com.google.common.io.InputSupplier;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.metamx.common.ISE;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.druid.merger.common.TaskStatus;
import com.metamx.druid.merger.common.actions.TaskActionClient;
import com.metamx.druid.merger.common.actions.TaskActionClientFactory;
import com.metamx.druid.merger.common.task.Task;
import com.metamx.druid.merger.coordinator.config.ForkingTaskRunnerConfig;
import com.metamx.druid.merger.worker.executor.ExecutorMain;
import com.metamx.emitter.EmittingLogger;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.RandomAccessFile;
import java.nio.channels.Channels;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
/**
* Runs tasks in separate processes using {@link ExecutorMain}.
*/
public class ForkingTaskRunner implements TaskRunner, TaskLogProvider
{
private static final EmittingLogger log = new EmittingLogger(ForkingTaskRunner.class);
private static final String CHILD_PROPERTY_PREFIX = "druid.indexer.fork.property.";
private final Object lock = new Object();
private final ForkingTaskRunnerConfig config;
private final ListeningExecutorService exec;
private final ObjectMapper jsonMapper;
private final List<ProcessHolder> processes = Lists.newArrayList();
public ForkingTaskRunner(
ForkingTaskRunnerConfig config,
ExecutorService exec,
ObjectMapper jsonMapper
)
{
this.config = config;
this.exec = MoreExecutors.listeningDecorator(exec);
this.jsonMapper = jsonMapper;
}
@Override
public ListenableFuture<TaskStatus> run(final Task task)
{
return exec.submit(
new Callable<TaskStatus>()
{
@Override
public TaskStatus call()
{
// TODO Keep around for some amount of time?
// TODO Directory per attempt? token? uuid?
final File tempDir = Files.createTempDir();
ProcessHolder processHolder = null;
try {
final File taskFile = new File(tempDir, "task.json");
final File statusFile = new File(tempDir, "status.json");
final File logFile = new File(tempDir, "log");
// locked so we can choose childHost/childPort based on processes.size
// and make sure we don't double up on ProcessHolders for a task
synchronized (lock) {
if (getProcessHolder(task.getId()).isPresent()) {
throw new ISE("Task already running: %s", task.getId());
}
final List<String> command = Lists.newArrayList();
final int childPort = config.getStartPort() + processes.size();
final String childHost = String.format(config.getHostPattern(), childPort);
Iterables.addAll(
command,
ImmutableList.of(
config.getJavaCommand(),
"-cp",
config.getJavaClasspath()
)
);
Iterables.addAll(
command,
Splitter.on(CharMatcher.WHITESPACE)
.omitEmptyStrings()
.split(config.getJavaOptions())
);
for (String propName : System.getProperties().stringPropertyNames()) {
if (propName.startsWith(CHILD_PROPERTY_PREFIX)) {
command.add(
String.format(
"-D%s=%s",
propName.substring(CHILD_PROPERTY_PREFIX.length()),
System.getProperty(propName)
)
);
}
}
command.add(String.format("-Ddruid.host=%s", childHost));
command.add(String.format("-Ddruid.port=%d", childPort));
// TODO configurable
command.add(ExecutorMain.class.getName());
command.add(taskFile.toString());
command.add(statusFile.toString());
Files.write(jsonMapper.writeValueAsBytes(task), taskFile);
log.info("Running command: %s", Joiner.on(" ").join(command));
processHolder = new ProcessHolder(
task,
new ProcessBuilder(ImmutableList.copyOf(command)).redirectErrorStream(true).start(),
logFile
);
processes.add(processHolder);
}
log.info("Logging task %s output to: %s", task.getId(), logFile);
final OutputStream toLogfile = Files.newOutputStreamSupplier(logFile).getOutput();
final InputStream fromProc = processHolder.process.getInputStream();
ByteStreams.copy(fromProc, toLogfile);
fromProc.close();
toLogfile.close();
final int statusCode = processHolder.process.waitFor();
if (statusCode == 0) {
// Process exited successfully
return jsonMapper.readValue(statusFile, TaskStatus.class);
} else {
// Process exited unsuccessfully
return TaskStatus.failure(task.getId());
}
}
catch (InterruptedException e) {
log.info(e, "Interrupted while waiting for process!");
return TaskStatus.failure(task.getId());
}
catch (IOException e) {
throw Throwables.propagate(e);
}
finally {
if (processHolder != null) {
synchronized (lock) {
processes.remove(processHolder);
}
}
if (tempDir.exists()) {
log.info("Removing temporary directory: %s", tempDir);
// TODO may want to keep this around a bit longer
// try {
// FileUtils.deleteDirectory(tempDir);
// }
// catch (IOException e) {
// log.error(e, "Failed to delete temporary directory");
// }
}
}
}
}
);
}
@LifecycleStop
public void stop()
{
synchronized (lock) {
exec.shutdownNow();
for (ProcessHolder processHolder : processes) {
log.info("Destroying process: %s", processHolder.process);
processHolder.process.destroy();
}
}
}
@Override
public void shutdown(final String taskid)
{
// TODO shutdown harder after more shutdowns
final Optional<ProcessHolder> processHolder = getProcessHolder(taskid);
if(processHolder.isPresent()) {
processHolder.get().shutdowns.incrementAndGet();
processHolder.get().process.destroy();
}
}
@Override
public Collection<TaskRunnerWorkItem> getRunningTasks()
{
return ImmutableList.of();
}
@Override
public Collection<TaskRunnerWorkItem> getPendingTasks()
{
return ImmutableList.of();
}
@Override
public Collection<ZkWorker> getWorkers()
{
return ImmutableList.of();
}
@Override
public Optional<InputSupplier<InputStream>> getLogs(final String taskid, final long offset)
{
final Optional<ProcessHolder> processHolder = getProcessHolder(taskid);
if (processHolder.isPresent()) {
return Optional.<InputSupplier<InputStream>>of(
new InputSupplier<InputStream>()
{
@Override
public InputStream getInput() throws IOException
{
final RandomAccessFile raf = new RandomAccessFile(processHolder.get().logFile, "r");
final long rafLength = raf.length();
if (offset > 0) {
raf.seek(offset);
} else if (offset < 0 && offset < rafLength) {
raf.seek(rafLength + offset);
}
return Channels.newInputStream(raf.getChannel());
}
}
);
} else {
return Optional.absent();
}
}
private Optional<ProcessHolder> getProcessHolder(final String taskid)
{
synchronized (lock) {
return Iterables.tryFind(
processes, new Predicate<ProcessHolder>()
{
@Override
public boolean apply(ProcessHolder processHolder)
{
return processHolder.task.getId().equals(taskid);
}
}
);
}
}
private static class ProcessHolder
{
private final Task task;
private final Process process;
private final File logFile;
private AtomicInteger shutdowns = new AtomicInteger(0);
private ProcessHolder(Task task, Process process, File logFile)
{
this.task = task;
this.process = process;
this.logFile = logFile;
}
}
}

View File

@ -29,13 +29,14 @@ import com.google.common.collect.Lists;
import com.google.common.collect.MinMaxPriorityQueue; import com.google.common.collect.MinMaxPriorityQueue;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import com.google.common.primitives.Ints; import com.google.common.primitives.Ints;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import com.metamx.common.ISE; import com.metamx.common.ISE;
import com.metamx.common.guava.FunctionalIterable; import com.metamx.common.guava.FunctionalIterable;
import com.metamx.common.lifecycle.LifecycleStart; import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop; import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.druid.merger.common.RetryPolicy; import com.metamx.druid.merger.common.RetryPolicy;
import com.metamx.druid.merger.common.RetryPolicyFactory; import com.metamx.druid.merger.common.RetryPolicyFactory;
import com.metamx.druid.merger.common.TaskCallback;
import com.metamx.druid.merger.common.TaskStatus; import com.metamx.druid.merger.common.TaskStatus;
import com.metamx.druid.merger.common.task.Task; import com.metamx.druid.merger.common.task.Task;
import com.metamx.druid.merger.coordinator.config.RemoteTaskRunnerConfig; import com.metamx.druid.merger.coordinator.config.RemoteTaskRunnerConfig;
@ -223,27 +224,19 @@ public class RemoteTaskRunner implements TaskRunner
/** /**
* A task will be run only if there is no current knowledge in the RemoteTaskRunner of the task. * A task will be run only if there is no current knowledge in the RemoteTaskRunner of the task.
* *
* @param task task to run * @param task task to run
* @param callback callback to be called exactly once
*/ */
@Override @Override
public void run(Task task, TaskCallback callback) public ListenableFuture<TaskStatus> run(final Task task)
{ {
if (runningTasks.containsKey(task.getId()) || pendingTasks.containsKey(task.getId())) { if (runningTasks.containsKey(task.getId()) || pendingTasks.containsKey(task.getId())) {
throw new ISE("Assigned a task[%s] that is already running or pending, WTF is happening?!", task.getId()); throw new ISE("Assigned a task[%s] that is already running or pending, WTF is happening?!", task.getId());
} }
TaskRunnerWorkItem taskRunnerWorkItem = new TaskRunnerWorkItem( TaskRunnerWorkItem taskRunnerWorkItem = new TaskRunnerWorkItem(
task, callback, retryPolicyFactory.makeRetryPolicy(), new DateTime() task, SettableFuture.<TaskStatus>create(), retryPolicyFactory.makeRetryPolicy(), new DateTime()
); );
addPendingTask(taskRunnerWorkItem); addPendingTask(taskRunnerWorkItem);
} return taskRunnerWorkItem.getResult();
private void addPendingTask(final TaskRunnerWorkItem taskRunnerWorkItem)
{
log.info("Added pending task %s", taskRunnerWorkItem.getTask().getId());
pendingTasks.put(taskRunnerWorkItem.getTask().getId(), taskRunnerWorkItem);
runPendingTasks();
} }
@Override @Override
@ -268,12 +261,12 @@ public class RemoteTaskRunner implements TaskRunner
while (!retryPolicy.hasExceededRetryThreshold()) { while (!retryPolicy.hasExceededRetryThreshold()) {
try { try {
final String response = httpClient.post(url) final String response = httpClient.post(url)
.setContent("application/json", jsonMapper.writeValueAsBytes(taskId)) .setContent("application/json", jsonMapper.writeValueAsBytes(taskId))
.go(new ToStringResponseHandler(Charsets.UTF_8)) .go(new ToStringResponseHandler(Charsets.UTF_8))
.get(); .get();
log.info("Sent shutdown message to worker: %s, response: %s", zkWorker.getWorker().getHost(), response); log.info("Sent shutdown message to worker: %s, response: %s", zkWorker.getWorker().getHost(), response);
return; return;
} }
catch (Exception e) { catch (Exception e) {
log.error(e, "Exception shutting down taskId: %s", taskId); log.error(e, "Exception shutting down taskId: %s", taskId);
@ -294,6 +287,14 @@ public class RemoteTaskRunner implements TaskRunner
} }
} }
private void addPendingTask(final TaskRunnerWorkItem taskRunnerWorkItem)
{
log.info("Added pending task %s", taskRunnerWorkItem.getTask().getId());
pendingTasks.put(taskRunnerWorkItem.getTask().getId(), taskRunnerWorkItem);
runPendingTasks();
}
/** /**
* This method uses a single threaded executor to extract all pending tasks and attempt to run them. Any tasks that * This method uses a single threaded executor to extract all pending tasks and attempt to run them. Any tasks that
* are successfully assigned to a worker will be moved from pendingTasks to runningTasks. This method is thread-safe. * are successfully assigned to a worker will be moved from pendingTasks to runningTasks. This method is thread-safe.
@ -507,9 +508,9 @@ public class RemoteTaskRunner implements TaskRunner
if (taskStatus.isComplete()) { if (taskStatus.isComplete()) {
if (taskRunnerWorkItem != null) { if (taskRunnerWorkItem != null) {
final TaskCallback callback = taskRunnerWorkItem.getCallback(); final SettableFuture<TaskStatus> result = taskRunnerWorkItem.getResult();
if (callback != null) { if (result != null) {
callback.notify(taskStatus); result.set(taskStatus);
} }
} }

View File

@ -0,0 +1,13 @@
package com.metamx.druid.merger.coordinator;
// TODO move to common or worker?
import com.google.common.base.Optional;
import com.google.common.io.InputSupplier;
import java.io.InputStream;
public interface TaskLogProvider
{
public Optional<InputSupplier<InputStream>> getLogs(String taskid, long offset);
}

View File

@ -25,8 +25,8 @@ import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop; import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.druid.initialization.Initialization; import com.metamx.druid.initialization.Initialization;
import com.metamx.druid.initialization.ServiceDiscoveryConfig; import com.metamx.druid.initialization.ServiceDiscoveryConfig;
import com.metamx.druid.merger.common.TaskToolbox; import com.metamx.druid.merger.common.actions.TaskActionClient;
import com.metamx.druid.merger.common.TaskToolboxFactory; import com.metamx.druid.merger.common.actions.TaskActionClientFactory;
import com.metamx.druid.merger.common.task.Task; import com.metamx.druid.merger.common.task.Task;
import com.metamx.druid.merger.coordinator.config.IndexerCoordinatorConfig; import com.metamx.druid.merger.coordinator.config.IndexerCoordinatorConfig;
import com.metamx.druid.merger.coordinator.exec.TaskConsumer; import com.metamx.druid.merger.coordinator.exec.TaskConsumer;
@ -51,7 +51,7 @@ public class TaskMasterLifecycle
private final ReentrantLock giant = new ReentrantLock(); private final ReentrantLock giant = new ReentrantLock();
private final Condition mayBeStopped = giant.newCondition(); private final Condition mayBeStopped = giant.newCondition();
private final TaskQueue taskQueue; private final TaskQueue taskQueue;
private final TaskToolboxFactory taskToolboxFactory; private final TaskActionClientFactory taskActionClientFactory;
private volatile boolean leading = false; private volatile boolean leading = false;
private volatile TaskRunner taskRunner; private volatile TaskRunner taskRunner;
@ -61,7 +61,7 @@ public class TaskMasterLifecycle
public TaskMasterLifecycle( public TaskMasterLifecycle(
final TaskQueue taskQueue, final TaskQueue taskQueue,
final TaskToolboxFactory taskToolboxFactory, final TaskActionClientFactory taskActionClientFactory,
final IndexerCoordinatorConfig indexerCoordinatorConfig, final IndexerCoordinatorConfig indexerCoordinatorConfig,
final ServiceDiscoveryConfig serviceDiscoveryConfig, final ServiceDiscoveryConfig serviceDiscoveryConfig,
final TaskRunnerFactory runnerFactory, final TaskRunnerFactory runnerFactory,
@ -71,7 +71,7 @@ public class TaskMasterLifecycle
) )
{ {
this.taskQueue = taskQueue; this.taskQueue = taskQueue;
this.taskToolboxFactory = taskToolboxFactory; this.taskActionClientFactory = taskActionClientFactory;
this.leaderSelector = new LeaderSelector( this.leaderSelector = new LeaderSelector(
curator, indexerCoordinatorConfig.getLeaderLatchPath(), new LeaderSelectorListener() curator, indexerCoordinatorConfig.getLeaderLatchPath(), new LeaderSelectorListener()
@ -89,7 +89,7 @@ public class TaskMasterLifecycle
final TaskConsumer taskConsumer = new TaskConsumer( final TaskConsumer taskConsumer = new TaskConsumer(
taskQueue, taskQueue,
taskRunner, taskRunner,
taskToolboxFactory, taskActionClientFactory,
emitter emitter
); );
@ -109,12 +109,16 @@ public class TaskMasterLifecycle
try { try {
leaderLifecycle.start(); leaderLifecycle.start();
while (leading) { while (leading && !Thread.currentThread().isInterrupted()) {
mayBeStopped.await(); mayBeStopped.await();
} }
} }
catch (InterruptedException e) {
// Suppress so we can bow out gracefully
}
finally { finally {
log.info("Bowing out!"); log.info("Bowing out!");
stopLeading();
leaderLifecycle.stop(); leaderLifecycle.stop();
} }
} }
@ -219,9 +223,9 @@ public class TaskMasterLifecycle
return taskQueue; return taskQueue;
} }
public TaskToolbox getTaskToolbox(Task task) public TaskActionClient getTaskActionClient(Task task)
{ {
return taskToolboxFactory.build(task); return taskActionClientFactory.create(task);
} }
public ResourceManagementScheduler getResourceManagementScheduler() public ResourceManagementScheduler getResourceManagementScheduler()

View File

@ -19,7 +19,8 @@
package com.metamx.druid.merger.coordinator; package com.metamx.druid.merger.coordinator;
import com.metamx.druid.merger.common.TaskCallback; import com.google.common.util.concurrent.ListenableFuture;
import com.metamx.druid.merger.common.TaskStatus;
import com.metamx.druid.merger.common.task.Task; import com.metamx.druid.merger.common.task.Task;
import java.util.Collection; import java.util.Collection;
@ -31,19 +32,22 @@ import java.util.Collection;
public interface TaskRunner public interface TaskRunner
{ {
/** /**
* Run a task with a particular context and call a callback. The callback may be called multiple times with RUNNING * Run a task. The returned status should be some kind of completed status.
* status, but should be called exactly once with a non-RUNNING status (e.g. SUCCESS, FAILED, CONTINUED...).
* *
* @param task task to run * @param task task to run
* @param callback callback to be called exactly once * @return task status, eventually
*/ */
public void run(Task task, TaskCallback callback); public ListenableFuture<TaskStatus> run(Task task);
/**
* Best-effort task cancellation. May or may not do anything. Calling this multiple times may have
* a stronger effect.
*/
public void shutdown(String taskid);
public Collection<TaskRunnerWorkItem> getRunningTasks(); public Collection<TaskRunnerWorkItem> getRunningTasks();
public Collection<TaskRunnerWorkItem> getPendingTasks(); public Collection<TaskRunnerWorkItem> getPendingTasks();
public Collection<ZkWorker> getWorkers(); public Collection<ZkWorker> getWorkers();
public void shutdown(String taskId);
} }

View File

@ -20,8 +20,9 @@
package com.metamx.druid.merger.coordinator; package com.metamx.druid.merger.coordinator;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.util.concurrent.SettableFuture;
import com.metamx.druid.merger.common.RetryPolicy; import com.metamx.druid.merger.common.RetryPolicy;
import com.metamx.druid.merger.common.TaskCallback; import com.metamx.druid.merger.common.TaskStatus;
import com.metamx.druid.merger.common.task.Task; import com.metamx.druid.merger.common.task.Task;
import org.joda.time.DateTime; import org.joda.time.DateTime;
import org.joda.time.DateTimeComparator; import org.joda.time.DateTimeComparator;
@ -32,7 +33,7 @@ import org.joda.time.DateTimeComparator;
public class TaskRunnerWorkItem implements Comparable<TaskRunnerWorkItem> public class TaskRunnerWorkItem implements Comparable<TaskRunnerWorkItem>
{ {
private final Task task; private final Task task;
private final TaskCallback callback; private final SettableFuture<TaskStatus> result;
private final RetryPolicy retryPolicy; private final RetryPolicy retryPolicy;
private final DateTime createdTime; private final DateTime createdTime;
@ -40,13 +41,13 @@ public class TaskRunnerWorkItem implements Comparable<TaskRunnerWorkItem>
public TaskRunnerWorkItem( public TaskRunnerWorkItem(
Task task, Task task,
TaskCallback callback, SettableFuture<TaskStatus> result,
RetryPolicy retryPolicy, RetryPolicy retryPolicy,
DateTime createdTime DateTime createdTime
) )
{ {
this.task = task; this.task = task;
this.callback = callback; this.result = result;
this.retryPolicy = retryPolicy; this.retryPolicy = retryPolicy;
this.createdTime = createdTime; this.createdTime = createdTime;
} }
@ -57,9 +58,9 @@ public class TaskRunnerWorkItem implements Comparable<TaskRunnerWorkItem>
return task; return task;
} }
public TaskCallback getCallback() public SettableFuture<TaskStatus> getResult()
{ {
return callback; return result;
} }
public RetryPolicy getRetryPolicy() public RetryPolicy getRetryPolicy()
@ -91,46 +92,12 @@ public class TaskRunnerWorkItem implements Comparable<TaskRunnerWorkItem>
return DateTimeComparator.getInstance().compare(createdTime, taskRunnerWorkItem.getCreatedTime()); return DateTimeComparator.getInstance().compare(createdTime, taskRunnerWorkItem.getCreatedTime());
} }
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
TaskRunnerWorkItem that = (TaskRunnerWorkItem) o;
if (callback != null ? !callback.equals(that.callback) : that.callback != null) {
return false;
}
if (retryPolicy != null ? !retryPolicy.equals(that.retryPolicy) : that.retryPolicy != null) {
return false;
}
if (task != null ? !task.equals(that.task) : that.task != null) {
return false;
}
return true;
}
@Override
public int hashCode()
{
int result = task != null ? task.hashCode() : 0;
result = 31 * result + (callback != null ? callback.hashCode() : 0);
result = 31 * result + (retryPolicy != null ? retryPolicy.hashCode() : 0);
return result;
}
@Override @Override
public String toString() public String toString()
{ {
return "TaskRunnerWorkItem{" + return "TaskRunnerWorkItem{" +
"task=" + task + "task=" + task +
", callback=" + callback + ", result=" + result +
", retryPolicy=" + retryPolicy + ", retryPolicy=" + retryPolicy +
", createdTime=" + createdTime + ", createdTime=" + createdTime +
'}'; '}';

View File

@ -0,0 +1,26 @@
package com.metamx.druid.merger.coordinator.config;
import org.skife.config.Config;
import org.skife.config.Default;
public abstract class ForkingTaskRunnerConfig
{
@Config("druid.indexer.fork.java")
@Default("java")
public abstract String getJavaCommand();
@Config("druid.indexer.fork.opts")
@Default("")
public abstract String getJavaOptions();
@Config("druid.indexer.fork.classpath")
public String getJavaClasspath() {
return System.getProperty("java.class.path");
}
@Config("druid.indexer.fork.hostpattern")
public abstract String getHostPattern();
@Config("druid.indexer.fork.startport")
public abstract int getStartPort();
}

View File

@ -20,11 +20,13 @@
package com.metamx.druid.merger.coordinator.exec; package com.metamx.druid.merger.coordinator.exec;
import com.google.common.base.Throwables; import com.google.common.base.Throwables;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.metamx.common.lifecycle.LifecycleStart; import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop; import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.druid.merger.common.TaskCallback;
import com.metamx.druid.merger.common.TaskStatus; import com.metamx.druid.merger.common.TaskStatus;
import com.metamx.druid.merger.common.TaskToolboxFactory; import com.metamx.druid.merger.common.actions.TaskActionClientFactory;
import com.metamx.druid.merger.common.task.Task; import com.metamx.druid.merger.common.task.Task;
import com.metamx.druid.merger.coordinator.TaskQueue; import com.metamx.druid.merger.coordinator.TaskQueue;
import com.metamx.druid.merger.coordinator.TaskRunner; import com.metamx.druid.merger.coordinator.TaskRunner;
@ -36,7 +38,7 @@ public class TaskConsumer implements Runnable
{ {
private final TaskQueue queue; private final TaskQueue queue;
private final TaskRunner runner; private final TaskRunner runner;
private final TaskToolboxFactory toolboxFactory; private final TaskActionClientFactory taskActionClientFactory;
private final ServiceEmitter emitter; private final ServiceEmitter emitter;
private final Thread thready; private final Thread thready;
@ -47,13 +49,13 @@ public class TaskConsumer implements Runnable
public TaskConsumer( public TaskConsumer(
TaskQueue queue, TaskQueue queue,
TaskRunner runner, TaskRunner runner,
TaskToolboxFactory toolboxFactory, TaskActionClientFactory taskActionClientFactory,
ServiceEmitter emitter ServiceEmitter emitter
) )
{ {
this.queue = queue; this.queue = queue;
this.runner = runner; this.runner = runner;
this.toolboxFactory = toolboxFactory; this.taskActionClientFactory = taskActionClientFactory;
this.emitter = emitter; this.emitter = emitter;
this.thready = new Thread(this); this.thready = new Thread(this);
} }
@ -123,7 +125,7 @@ public class TaskConsumer implements Runnable
// Run preflight checks // Run preflight checks
TaskStatus preflightStatus; TaskStatus preflightStatus;
try { try {
preflightStatus = task.preflight(toolboxFactory.build(task)); preflightStatus = task.preflight(taskActionClientFactory.create(task));
log.info("Preflight done for task: %s", task.getId()); log.info("Preflight done for task: %s", task.getId());
} }
catch (Exception e) { catch (Exception e) {
@ -138,15 +140,34 @@ public class TaskConsumer implements Runnable
} }
// Hand off work to TaskRunner, with a callback // Hand off work to TaskRunner, with a callback
runner.run( final ListenableFuture<TaskStatus> status = runner.run(task);
task, new TaskCallback()
Futures.addCallback(
status, new FutureCallback<TaskStatus>()
{ {
@Override @Override
public void notify(final TaskStatus statusFromRunner) public void onSuccess(final TaskStatus status)
{
log.info("Received %s status for task: %s", status.getStatusCode(), task);
handleStatus(status);
}
@Override
public void onFailure(Throwable t)
{
log.makeAlert(t, "Failed to run task")
.addData("task", task.getId())
.addData("type", task.getType())
.addData("dataSource", task.getDataSource())
.addData("interval", task.getImplicitLockInterval())
.emit();
handleStatus(TaskStatus.failure(task.getId()));
}
private void handleStatus(TaskStatus status)
{ {
try { try {
log.info("Received %s status for task: %s", statusFromRunner.getStatusCode(), task);
// If we're not supposed to be running anymore, don't do anything. Somewhat racey if the flag gets set after // If we're not supposed to be running anymore, don't do anything. Somewhat racey if the flag gets set after
// we check and before we commit the database transaction, but better than nothing. // we check and before we commit the database transaction, but better than nothing.
if (shutdown) { if (shutdown) {
@ -154,34 +175,25 @@ public class TaskConsumer implements Runnable
return; return;
} }
queue.notify(task, statusFromRunner); queue.notify(task, status);
// Emit event and log, if the task is done // Emit event and log, if the task is done
if (statusFromRunner.isComplete()) { if (status.isComplete()) {
metricBuilder.setUser3(statusFromRunner.getStatusCode().toString()); metricBuilder.setUser3(status.getStatusCode().toString());
emitter.emit(metricBuilder.build("indexer/time/run/millis", statusFromRunner.getDuration())); emitter.emit(metricBuilder.build("indexer/time/run/millis", status.getDuration()));
if (statusFromRunner.isFailure()) {
log.makeAlert("Failed to index")
.addData("task", task.getId())
.addData("type", task.getType())
.addData("dataSource", task.getDataSource())
.addData("interval", task.getImplicitLockInterval())
.emit();
}
log.info( log.info(
"Task %s: %s (%d run duration)", "Task %s: %s (%d run duration)",
statusFromRunner.getStatusCode(), status.getStatusCode(),
task, task,
statusFromRunner.getDuration() status.getDuration()
); );
} }
} }
catch (Exception e) { catch (Exception e) {
log.makeAlert(e, "Failed to handle task callback") log.makeAlert(e, "Failed to handle task status")
.addData("task", task.getId()) .addData("task", task.getId())
.addData("statusCode", statusFromRunner.getStatusCode()) .addData("statusCode", status.getStatusCode())
.emit(); .emit();
} }
} }

View File

@ -21,11 +21,10 @@ package com.metamx.druid.merger.coordinator.http;
import com.amazonaws.auth.BasicAWSCredentials; import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.services.ec2.AmazonEC2Client; import com.amazonaws.services.ec2.AmazonEC2Client;
import com.fasterxml.jackson.databind.InjectableValues;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.smile.SmileFactory;
import com.google.common.base.Charsets; import com.google.common.base.Charsets;
import com.google.common.base.Throwables; import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.common.util.concurrent.ThreadFactoryBuilder;
@ -40,11 +39,7 @@ import com.metamx.common.lifecycle.Lifecycle;
import com.metamx.common.lifecycle.LifecycleStart; import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop; import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.common.logger.Logger; import com.metamx.common.logger.Logger;
import com.metamx.druid.BaseServerNode; import com.metamx.druid.RegisteringNode;
import com.metamx.druid.client.ClientConfig;
import com.metamx.druid.client.ClientInventoryManager;
import com.metamx.druid.client.MutableServerView;
import com.metamx.druid.client.OnlyNewSegmentWatcherServerView;
import com.metamx.druid.config.ConfigManager; import com.metamx.druid.config.ConfigManager;
import com.metamx.druid.config.ConfigManagerConfig; import com.metamx.druid.config.ConfigManagerConfig;
import com.metamx.druid.config.JacksonConfigManager; import com.metamx.druid.config.JacksonConfigManager;
@ -56,23 +51,18 @@ import com.metamx.druid.http.RedirectInfo;
import com.metamx.druid.http.StatusServlet; import com.metamx.druid.http.StatusServlet;
import com.metamx.druid.initialization.Initialization; import com.metamx.druid.initialization.Initialization;
import com.metamx.druid.initialization.ServerConfig; import com.metamx.druid.initialization.ServerConfig;
import com.metamx.druid.initialization.ServerInit;
import com.metamx.druid.initialization.ServiceDiscoveryConfig; import com.metamx.druid.initialization.ServiceDiscoveryConfig;
import com.metamx.druid.jackson.DefaultObjectMapper; import com.metamx.druid.jackson.DefaultObjectMapper;
import com.metamx.druid.loading.DataSegmentKiller;
import com.metamx.druid.loading.DataSegmentPusher;
import com.metamx.druid.loading.S3DataSegmentKiller;
import com.metamx.druid.merger.common.RetryPolicyFactory; import com.metamx.druid.merger.common.RetryPolicyFactory;
import com.metamx.druid.merger.common.TaskToolboxFactory;
import com.metamx.druid.merger.common.actions.LocalTaskActionClientFactory; import com.metamx.druid.merger.common.actions.LocalTaskActionClientFactory;
import com.metamx.druid.merger.common.actions.TaskActionClientFactory;
import com.metamx.druid.merger.common.actions.TaskActionToolbox; import com.metamx.druid.merger.common.actions.TaskActionToolbox;
import com.metamx.druid.merger.common.config.IndexerZkConfig; import com.metamx.druid.merger.common.config.IndexerZkConfig;
import com.metamx.druid.merger.common.config.RetryPolicyConfig; import com.metamx.druid.merger.common.config.RetryPolicyConfig;
import com.metamx.druid.merger.common.config.TaskConfig;
import com.metamx.druid.merger.common.index.StaticS3FirehoseFactory; import com.metamx.druid.merger.common.index.StaticS3FirehoseFactory;
import com.metamx.druid.merger.coordinator.DbTaskStorage; import com.metamx.druid.merger.coordinator.DbTaskStorage;
import com.metamx.druid.merger.coordinator.ForkingTaskRunner;
import com.metamx.druid.merger.coordinator.HeapMemoryTaskStorage; import com.metamx.druid.merger.coordinator.HeapMemoryTaskStorage;
import com.metamx.druid.merger.coordinator.LocalTaskRunner;
import com.metamx.druid.merger.coordinator.MergerDBCoordinator; import com.metamx.druid.merger.coordinator.MergerDBCoordinator;
import com.metamx.druid.merger.coordinator.RemoteTaskRunner; import com.metamx.druid.merger.coordinator.RemoteTaskRunner;
import com.metamx.druid.merger.coordinator.TaskLockbox; import com.metamx.druid.merger.coordinator.TaskLockbox;
@ -83,6 +73,7 @@ import com.metamx.druid.merger.coordinator.TaskRunnerFactory;
import com.metamx.druid.merger.coordinator.TaskStorage; import com.metamx.druid.merger.coordinator.TaskStorage;
import com.metamx.druid.merger.coordinator.TaskStorageQueryAdapter; import com.metamx.druid.merger.coordinator.TaskStorageQueryAdapter;
import com.metamx.druid.merger.coordinator.config.EC2AutoScalingStrategyConfig; import com.metamx.druid.merger.coordinator.config.EC2AutoScalingStrategyConfig;
import com.metamx.druid.merger.coordinator.config.ForkingTaskRunnerConfig;
import com.metamx.druid.merger.coordinator.config.IndexerCoordinatorConfig; import com.metamx.druid.merger.coordinator.config.IndexerCoordinatorConfig;
import com.metamx.druid.merger.coordinator.config.IndexerDbConnectorConfig; import com.metamx.druid.merger.coordinator.config.IndexerDbConnectorConfig;
import com.metamx.druid.merger.coordinator.config.RemoteTaskRunnerConfig; import com.metamx.druid.merger.coordinator.config.RemoteTaskRunnerConfig;
@ -95,9 +86,6 @@ import com.metamx.druid.merger.coordinator.scaling.ResourceManagementSchedulerFa
import com.metamx.druid.merger.coordinator.scaling.SimpleResourceManagementStrategy; import com.metamx.druid.merger.coordinator.scaling.SimpleResourceManagementStrategy;
import com.metamx.druid.merger.coordinator.scaling.SimpleResourceManagmentConfig; import com.metamx.druid.merger.coordinator.scaling.SimpleResourceManagmentConfig;
import com.metamx.druid.merger.coordinator.setup.WorkerSetupData; import com.metamx.druid.merger.coordinator.setup.WorkerSetupData;
import com.metamx.druid.realtime.SegmentAnnouncer;
import com.metamx.druid.realtime.ZkSegmentAnnouncer;
import com.metamx.druid.realtime.ZkSegmentAnnouncerConfig;
import com.metamx.druid.utils.PropUtils; import com.metamx.druid.utils.PropUtils;
import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.core.Emitters; import com.metamx.emitter.core.Emitters;
@ -113,9 +101,6 @@ import com.metamx.metrics.MonitorSchedulerConfig;
import com.metamx.metrics.SysMonitor; import com.metamx.metrics.SysMonitor;
import com.netflix.curator.framework.CuratorFramework; import com.netflix.curator.framework.CuratorFramework;
import com.netflix.curator.framework.recipes.cache.PathChildrenCache; import com.netflix.curator.framework.recipes.cache.PathChildrenCache;
import org.jets3t.service.S3ServiceException;
import org.jets3t.service.impl.rest.httpclient.RestS3Service;
import org.jets3t.service.security.AWSCredentials;
import org.mortbay.jetty.Server; import org.mortbay.jetty.Server;
import org.mortbay.jetty.servlet.Context; import org.mortbay.jetty.servlet.Context;
import org.mortbay.jetty.servlet.DefaultServlet; import org.mortbay.jetty.servlet.DefaultServlet;
@ -135,7 +120,7 @@ import java.util.concurrent.atomic.AtomicReference;
/** /**
*/ */
public class IndexerCoordinatorNode extends BaseServerNode<IndexerCoordinatorNode> public class IndexerCoordinatorNode extends RegisteringNode
{ {
private static final Logger log = new Logger(IndexerCoordinatorNode.class); private static final Logger log = new Logger(IndexerCoordinatorNode.class);
@ -145,6 +130,7 @@ public class IndexerCoordinatorNode extends BaseServerNode<IndexerCoordinatorNod
} }
private final Lifecycle lifecycle; private final Lifecycle lifecycle;
private final ObjectMapper jsonMapper;
private final Properties props; private final Properties props;
private final ConfigurationObjectFactory configFactory; private final ConfigurationObjectFactory configFactory;
@ -152,11 +138,7 @@ public class IndexerCoordinatorNode extends BaseServerNode<IndexerCoordinatorNod
private ServiceEmitter emitter = null; private ServiceEmitter emitter = null;
private DbConnectorConfig dbConnectorConfig = null; private DbConnectorConfig dbConnectorConfig = null;
private DBI dbi = null; private DBI dbi = null;
private RestS3Service s3Service = null;
private IndexerCoordinatorConfig config = null; private IndexerCoordinatorConfig config = null;
private TaskConfig taskConfig = null;
private DataSegmentPusher segmentPusher = null;
private TaskToolboxFactory taskToolboxFactory = null;
private MergerDBCoordinator mergerDBCoordinator = null; private MergerDBCoordinator mergerDBCoordinator = null;
private TaskStorage taskStorage = null; private TaskStorage taskStorage = null;
private TaskQueue taskQueue = null; private TaskQueue taskQueue = null;
@ -166,8 +148,8 @@ public class IndexerCoordinatorNode extends BaseServerNode<IndexerCoordinatorNod
private IndexerZkConfig indexerZkConfig; private IndexerZkConfig indexerZkConfig;
private TaskRunnerFactory taskRunnerFactory = null; private TaskRunnerFactory taskRunnerFactory = null;
private ResourceManagementSchedulerFactory resourceManagementSchedulerFactory = null; private ResourceManagementSchedulerFactory resourceManagementSchedulerFactory = null;
private TaskActionClientFactory taskActionClientFactory = null;
private TaskMasterLifecycle taskMasterLifecycle = null; private TaskMasterLifecycle taskMasterLifecycle = null;
private MutableServerView newSegmentServerView = null;
private Server server = null; private Server server = null;
private boolean initialized = false; private boolean initialized = false;
@ -176,14 +158,14 @@ public class IndexerCoordinatorNode extends BaseServerNode<IndexerCoordinatorNod
Properties props, Properties props,
Lifecycle lifecycle, Lifecycle lifecycle,
ObjectMapper jsonMapper, ObjectMapper jsonMapper,
ObjectMapper smileMapper,
ConfigurationObjectFactory configFactory ConfigurationObjectFactory configFactory
) )
{ {
super(log, props, lifecycle, jsonMapper, smileMapper, configFactory); super(ImmutableList.of(jsonMapper));
this.lifecycle = lifecycle; this.lifecycle = lifecycle;
this.props = props; this.props = props;
this.jsonMapper = jsonMapper;
this.configFactory = configFactory; this.configFactory = configFactory;
} }
@ -205,30 +187,12 @@ public class IndexerCoordinatorNode extends BaseServerNode<IndexerCoordinatorNod
return this; return this;
} }
public IndexerCoordinatorNode setNewSegmentServerView(MutableServerView newSegmentServerView)
{
this.newSegmentServerView = newSegmentServerView;
return this;
}
public IndexerCoordinatorNode setS3Service(RestS3Service s3Service)
{
this.s3Service = s3Service;
return this;
}
public IndexerCoordinatorNode setTaskLockbox(TaskLockbox taskLockbox) public IndexerCoordinatorNode setTaskLockbox(TaskLockbox taskLockbox)
{ {
this.taskLockbox = taskLockbox; this.taskLockbox = taskLockbox;
return this; return this;
} }
public IndexerCoordinatorNode setSegmentPusher(DataSegmentPusher segmentPusher)
{
this.segmentPusher = segmentPusher;
return this;
}
public IndexerCoordinatorNode setMergeDbCoordinator(MergerDBCoordinator mergeDbCoordinator) public IndexerCoordinatorNode setMergeDbCoordinator(MergerDBCoordinator mergeDbCoordinator)
{ {
this.mergerDBCoordinator = mergeDbCoordinator; this.mergerDBCoordinator = mergeDbCoordinator;
@ -273,19 +237,14 @@ public class IndexerCoordinatorNode extends BaseServerNode<IndexerCoordinatorNod
initializeEmitter(); initializeEmitter();
initializeMonitors(); initializeMonitors();
initializeIndexerCoordinatorConfig(); initializeIndexerCoordinatorConfig();
initializeTaskConfig();
initializeS3Service();
initializeMergeDBCoordinator(); initializeMergeDBCoordinator();
initializeNewSegmentServerView();
initializeTaskStorage(); initializeTaskStorage();
initializeTaskLockbox(); initializeTaskLockbox();
initializeTaskQueue(); initializeTaskQueue();
initializeDataSegmentPusher();
initializeTaskToolbox();
initializeJacksonInjections();
initializeJacksonSubtypes(); initializeJacksonSubtypes();
initializeCurator(); initializeCurator();
initializeIndexerZkConfig(); initializeIndexerZkConfig();
initializeTaskActionClientFactory();
initializeTaskRunnerFactory(configManager); initializeTaskRunnerFactory(configManager);
initializeResourceManagement(configManager); initializeResourceManagement(configManager);
initializeTaskMasterLifecycle(); initializeTaskMasterLifecycle();
@ -314,10 +273,12 @@ public class IndexerCoordinatorNode extends BaseServerNode<IndexerCoordinatorNod
final Context staticContext = new Context(server, "/static", Context.SESSIONS); final Context staticContext = new Context(server, "/static", Context.SESSIONS);
staticContext.addServlet(new ServletHolder(new DefaultServlet()), "/*"); staticContext.addServlet(new ServletHolder(new DefaultServlet()), "/*");
ResourceCollection resourceCollection = new ResourceCollection(new String[] { ResourceCollection resourceCollection = new ResourceCollection(
IndexerCoordinatorNode.class.getClassLoader().getResource("static").toExternalForm(), new String[]{
IndexerCoordinatorNode.class.getClassLoader().getResource("indexer_static").toExternalForm() IndexerCoordinatorNode.class.getClassLoader().getResource("static").toExternalForm(),
}); IndexerCoordinatorNode.class.getClassLoader().getResource("indexer_static").toExternalForm()
}
);
staticContext.setBaseResource(resourceCollection); staticContext.setBaseResource(resourceCollection);
// TODO -- Need a QueryServlet and some kind of QuerySegmentWalker if we want to support querying tasks // TODO -- Need a QueryServlet and some kind of QuerySegmentWalker if we want to support querying tasks
@ -364,13 +325,28 @@ public class IndexerCoordinatorNode extends BaseServerNode<IndexerCoordinatorNod
initialized = true; initialized = true;
} }
private ObjectMapper getJsonMapper()
{
return jsonMapper;
}
private void initializeTaskActionClientFactory()
{
if (taskActionClientFactory == null) {
taskActionClientFactory = new LocalTaskActionClientFactory(
taskStorage,
new TaskActionToolbox(taskQueue, taskLockbox, mergerDBCoordinator, emitter)
);
}
}
private void initializeTaskMasterLifecycle() private void initializeTaskMasterLifecycle()
{ {
if (taskMasterLifecycle == null) { if (taskMasterLifecycle == null) {
final ServiceDiscoveryConfig serviceDiscoveryConfig = configFactory.build(ServiceDiscoveryConfig.class); final ServiceDiscoveryConfig serviceDiscoveryConfig = configFactory.build(ServiceDiscoveryConfig.class);
taskMasterLifecycle = new TaskMasterLifecycle( taskMasterLifecycle = new TaskMasterLifecycle(
taskQueue, taskQueue,
taskToolboxFactory, taskActionClientFactory,
config, config,
serviceDiscoveryConfig, serviceDiscoveryConfig,
taskRunnerFactory, taskRunnerFactory,
@ -386,7 +362,7 @@ public class IndexerCoordinatorNode extends BaseServerNode<IndexerCoordinatorNod
public synchronized void start() throws Exception public synchronized void start() throws Exception
{ {
if (!initialized) { if (!initialized) {
init(); doInit();
} }
lifecycle.start(); lifecycle.start();
@ -429,16 +405,6 @@ public class IndexerCoordinatorNode extends BaseServerNode<IndexerCoordinatorNod
} }
} }
private void initializeJacksonInjections()
{
InjectableValues.Std injectables = new InjectableValues.Std();
injectables.addValue("s3Client", s3Service)
.addValue("segmentPusher", segmentPusher);
getJsonMapper().setInjectableValues(injectables);
}
private void initializeJacksonSubtypes() private void initializeJacksonSubtypes()
{ {
getJsonMapper().registerSubtypes(StaticS3FirehoseFactory.class); getJsonMapper().registerSubtypes(StaticS3FirehoseFactory.class);
@ -486,71 +452,6 @@ public class IndexerCoordinatorNode extends BaseServerNode<IndexerCoordinatorNod
} }
} }
private void initializeTaskConfig()
{
if (taskConfig == null) {
taskConfig = configFactory.build(TaskConfig.class);
}
}
private void initializeNewSegmentServerView()
{
if (newSegmentServerView == null) {
final MutableServerView view = new OnlyNewSegmentWatcherServerView();
final ClientInventoryManager clientInventoryManager = new ClientInventoryManager(
getConfigFactory().build(ClientConfig.class),
getPhoneBook(),
view
);
lifecycle.addManagedInstance(clientInventoryManager);
this.newSegmentServerView = view;
}
}
public void initializeS3Service() throws S3ServiceException
{
this.s3Service = new RestS3Service(
new AWSCredentials(
PropUtils.getProperty(props, "com.metamx.aws.accessKey"),
PropUtils.getProperty(props, "com.metamx.aws.secretKey")
)
);
}
public void initializeDataSegmentPusher()
{
if (segmentPusher == null) {
segmentPusher = ServerInit.getSegmentPusher(props, configFactory, getJsonMapper());
}
}
public void initializeTaskToolbox()
{
if (taskToolboxFactory == null) {
final SegmentAnnouncer segmentAnnouncer = new ZkSegmentAnnouncer(
configFactory.build(ZkSegmentAnnouncerConfig.class),
getPhoneBook()
);
final DataSegmentKiller dataSegmentKiller = new S3DataSegmentKiller(s3Service);
taskToolboxFactory = new TaskToolboxFactory(
taskConfig,
new LocalTaskActionClientFactory(
taskStorage,
new TaskActionToolbox(taskQueue, taskLockbox, mergerDBCoordinator, emitter)
),
emitter,
s3Service,
segmentPusher,
dataSegmentKiller,
segmentAnnouncer,
newSegmentServerView,
getConglomerate(),
getJsonMapper()
);
}
}
public void initializeMergeDBCoordinator() public void initializeMergeDBCoordinator()
{ {
if (mergerDBCoordinator == null) { if (mergerDBCoordinator == null) {
@ -602,7 +503,11 @@ public class IndexerCoordinatorNode extends BaseServerNode<IndexerCoordinatorNod
taskStorage = new HeapMemoryTaskStorage(); taskStorage = new HeapMemoryTaskStorage();
} else if (config.getStorageImpl().equals("db")) { } else if (config.getStorageImpl().equals("db")) {
final IndexerDbConnectorConfig dbConnectorConfig = configFactory.build(IndexerDbConnectorConfig.class); final IndexerDbConnectorConfig dbConnectorConfig = configFactory.build(IndexerDbConnectorConfig.class);
taskStorage = new DbTaskStorage(getJsonMapper(), dbConnectorConfig, new DbConnector(dbConnectorConfig).getDBI()); taskStorage = new DbTaskStorage(
getJsonMapper(),
dbConnectorConfig,
new DbConnector(dbConnectorConfig).getDBI()
);
} else { } else {
throw new ISE("Invalid storage implementation: %s", config.getStorageImpl()); throw new ISE("Invalid storage implementation: %s", config.getStorageImpl());
} }
@ -640,7 +545,10 @@ public class IndexerCoordinatorNode extends BaseServerNode<IndexerCoordinatorNod
ImmutableMap.of("base_path", "druid.indexing") ImmutableMap.of("base_path", "druid.indexing")
) )
), ),
configManager.watch(WorkerSetupData.CONFIG_KEY, WorkerSetupData.class) configManager.watch(WorkerSetupData.CONFIG_KEY, WorkerSetupData.class),
HttpClientInit.createClient(
HttpClientConfig.builder().withNumConnections(1).build(), lifecycle
)
); );
return remoteTaskRunner; return remoteTaskRunner;
@ -654,7 +562,11 @@ public class IndexerCoordinatorNode extends BaseServerNode<IndexerCoordinatorNod
public TaskRunner build() public TaskRunner build()
{ {
final ExecutorService runnerExec = Executors.newFixedThreadPool(config.getNumLocalThreads()); final ExecutorService runnerExec = Executors.newFixedThreadPool(config.getNumLocalThreads());
return new LocalTaskRunner(taskToolboxFactory, runnerExec); return new ForkingTaskRunner(
configFactory.build(ForkingTaskRunnerConfig.class),
runnerExec,
getJsonMapper()
);
} }
}; };
} else { } else {
@ -750,13 +662,8 @@ public class IndexerCoordinatorNode extends BaseServerNode<IndexerCoordinatorNod
public IndexerCoordinatorNode build() public IndexerCoordinatorNode build()
{ {
if (jsonMapper == null && smileMapper == null) { if (jsonMapper == null) {
jsonMapper = new DefaultObjectMapper(); jsonMapper = new DefaultObjectMapper();
smileMapper = new DefaultObjectMapper(new SmileFactory());
smileMapper.getJsonFactory().setCodec(smileMapper);
}
else if (jsonMapper == null || smileMapper == null) {
throw new ISE("Only jsonMapper[%s] or smileMapper[%s] was set, must set neither or both.", jsonMapper, smileMapper);
} }
if (lifecycle == null) { if (lifecycle == null) {
@ -771,7 +678,7 @@ public class IndexerCoordinatorNode extends BaseServerNode<IndexerCoordinatorNod
configFactory = Config.createFactory(props); configFactory = Config.createFactory(props);
} }
return new IndexerCoordinatorNode(props, lifecycle, jsonMapper, smileMapper, configFactory); return new IndexerCoordinatorNode(props, lifecycle, jsonMapper, configFactory);
} }
} }
} }

View File

@ -190,14 +190,13 @@ public class IndexerCoordinatorResource
{ {
final Map<String, Object> retMap; final Map<String, Object> retMap;
// TODO make sure this worker is supposed to be running this task (attempt id? token?)
try { try {
final T ret = taskMasterLifecycle.getTaskToolbox(holder.getTask()) final T ret = taskMasterLifecycle.getTaskActionClient(holder.getTask())
.getTaskActionClient()
.submit(holder.getAction()); .submit(holder.getAction());
retMap = Maps.newHashMap(); retMap = ImmutableMap.<String, Object>of("result", ret);
retMap.put("result", ret); } catch(IOException e) {
}
catch (IOException e) {
return Response.serverError().build(); return Response.serverError().build();
} }

View File

@ -19,24 +19,22 @@
package com.metamx.druid.merger.worker; package com.metamx.druid.merger.worker;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.metamx.common.lifecycle.LifecycleStart; import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop; import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.druid.Query;
import com.metamx.druid.merger.common.TaskStatus; import com.metamx.druid.merger.common.TaskStatus;
import com.metamx.druid.merger.common.TaskToolbox; import com.metamx.druid.merger.common.TaskToolbox;
import com.metamx.druid.merger.common.TaskToolboxFactory; import com.metamx.druid.merger.common.TaskToolboxFactory;
import com.metamx.druid.merger.common.task.Task; import com.metamx.druid.merger.common.task.Task;
import com.metamx.druid.query.NoopQueryRunner; import com.metamx.druid.merger.coordinator.TaskRunner;
import com.metamx.druid.query.QueryRunner; import com.metamx.druid.merger.coordinator.TaskRunnerFactory;
import com.metamx.druid.query.segment.QuerySegmentWalker; import com.metamx.druid.query.segment.QuerySegmentWalker;
import com.metamx.druid.query.segment.SegmentDescriptor;
import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.EmittingLogger;
import com.netflix.curator.framework.CuratorFramework; import com.netflix.curator.framework.CuratorFramework;
import com.netflix.curator.framework.recipes.cache.PathChildrenCache; import com.netflix.curator.framework.recipes.cache.PathChildrenCache;
import com.netflix.curator.framework.recipes.cache.PathChildrenCacheEvent; import com.netflix.curator.framework.recipes.cache.PathChildrenCacheEvent;
import com.netflix.curator.framework.recipes.cache.PathChildrenCacheListener; import com.netflix.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.commons.io.FileUtils; import org.apache.commons.io.FileUtils;
import org.joda.time.Interval;
import java.io.File; import java.io.File;
import java.util.List; import java.util.List;
@ -50,29 +48,32 @@ import java.util.concurrent.ExecutorService;
* The monitor implements {@link QuerySegmentWalker} so tasks can offer up queryable data. This is useful for * The monitor implements {@link QuerySegmentWalker} so tasks can offer up queryable data. This is useful for
* realtime index tasks. * realtime index tasks.
*/ */
public class WorkerTaskMonitor implements QuerySegmentWalker public class WorkerTaskMonitor
{ {
private static final EmittingLogger log = new EmittingLogger(WorkerTaskMonitor.class); private static final EmittingLogger log = new EmittingLogger(WorkerTaskMonitor.class);
private final ObjectMapper jsonMapper;
private final PathChildrenCache pathChildrenCache; private final PathChildrenCache pathChildrenCache;
private final CuratorFramework cf; private final CuratorFramework cf;
private final WorkerCuratorCoordinator workerCuratorCoordinator; private final WorkerCuratorCoordinator workerCuratorCoordinator;
private final TaskToolboxFactory toolboxFactory; private final TaskRunner taskRunner;
private final ExecutorService exec; private final ExecutorService exec;
private final List<Task> running = new CopyOnWriteArrayList<Task>(); private final List<Task> running = new CopyOnWriteArrayList<Task>();
public WorkerTaskMonitor( public WorkerTaskMonitor(
ObjectMapper jsonMapper,
PathChildrenCache pathChildrenCache, PathChildrenCache pathChildrenCache,
CuratorFramework cf, CuratorFramework cf,
WorkerCuratorCoordinator workerCuratorCoordinator, WorkerCuratorCoordinator workerCuratorCoordinator,
TaskToolboxFactory toolboxFactory, TaskRunner taskRunner,
ExecutorService exec ExecutorService exec
) )
{ {
this.jsonMapper = jsonMapper;
this.pathChildrenCache = pathChildrenCache; this.pathChildrenCache = pathChildrenCache;
this.cf = cf; this.cf = cf;
this.workerCuratorCoordinator = workerCuratorCoordinator; this.workerCuratorCoordinator = workerCuratorCoordinator;
this.toolboxFactory = toolboxFactory; this.taskRunner = taskRunner;
this.exec = exec; this.exec = exec;
} }
@ -94,11 +95,10 @@ public class WorkerTaskMonitor implements QuerySegmentWalker
throws Exception throws Exception
{ {
if (pathChildrenCacheEvent.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED)) { if (pathChildrenCacheEvent.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED)) {
final Task task = toolboxFactory.getObjectMapper().readValue( final Task task = jsonMapper.readValue(
cf.getData().forPath(pathChildrenCacheEvent.getData().getPath()), cf.getData().forPath(pathChildrenCacheEvent.getData().getPath()),
Task.class Task.class
); );
final TaskToolbox toolbox = toolboxFactory.build(task);
if (isTaskRunning(task)) { if (isTaskRunning(task)) {
log.warn("Got task %s that I am already running...", task.getId()); log.warn("Got task %s that I am already running...", task.getId());
@ -113,7 +113,6 @@ public class WorkerTaskMonitor implements QuerySegmentWalker
public void run() public void run()
{ {
final long startTime = System.currentTimeMillis(); final long startTime = System.currentTimeMillis();
final File taskDir = toolbox.getTaskDir();
log.info("Running task [%s]", task.getId()); log.info("Running task [%s]", task.getId());
running.add(task); running.add(task);
@ -122,7 +121,7 @@ public class WorkerTaskMonitor implements QuerySegmentWalker
try { try {
workerCuratorCoordinator.unannounceTask(task.getId()); workerCuratorCoordinator.unannounceTask(task.getId());
workerCuratorCoordinator.announceStatus(TaskStatus.running(task.getId())); workerCuratorCoordinator.announceStatus(TaskStatus.running(task.getId()));
taskStatus = task.run(toolbox); taskStatus = taskRunner.run(task).get();
} }
catch (Exception e) { catch (Exception e) {
log.makeAlert(e, "Failed to run task") log.makeAlert(e, "Failed to run task")
@ -144,19 +143,6 @@ public class WorkerTaskMonitor implements QuerySegmentWalker
.addData("task", task.getId()) .addData("task", task.getId())
.emit(); .emit();
} }
try {
if (taskDir.exists()) {
log.info("Removing task directory: %s", taskDir);
FileUtils.deleteDirectory(taskDir);
}
}
catch (Exception e) {
log.makeAlert(e, "Failed to delete task directory")
.addData("taskDir", taskDir.toString())
.addData("task", task.getId())
.emit();
}
} }
} }
); );
@ -196,38 +182,4 @@ public class WorkerTaskMonitor implements QuerySegmentWalker
.emit(); .emit();
} }
} }
@Override
public <T> QueryRunner<T> getQueryRunnerForIntervals(Query<T> query, Iterable<Interval> intervals)
{
return getQueryRunnerImpl(query);
}
@Override
public <T> QueryRunner<T> getQueryRunnerForSegments(Query<T> query, Iterable<SegmentDescriptor> specs)
{
return getQueryRunnerImpl(query);
}
private <T> QueryRunner<T> getQueryRunnerImpl(Query<T> query) {
QueryRunner<T> queryRunner = null;
for (final Task task : running) {
if (task.getDataSource().equals(query.getDataSource())) {
final QueryRunner<T> taskQueryRunner = task.getQueryRunner(query);
if (taskQueryRunner != null) {
if (queryRunner == null) {
queryRunner = taskQueryRunner;
} else {
log.makeAlert("Found too many query runners for datasource")
.addData("dataSource", query.getDataSource())
.emit();
}
}
}
}
return queryRunner == null ? new NoopQueryRunner<T>() : queryRunner;
}
} }

View File

@ -0,0 +1,81 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.merger.worker.executor;
import com.metamx.common.lifecycle.Lifecycle;
import com.metamx.common.logger.Logger;
import com.metamx.druid.log.LogLevelAdjuster;
import com.metamx.druid.merger.common.TaskStatus;
import com.metamx.druid.merger.common.task.Task;
import java.io.File;
/**
*/
public class ExecutorMain
{
private static final Logger log = new Logger(ExecutorMain.class);
public static void main(String[] args) throws Exception
{
LogLevelAdjuster.register();
if (args.length != 2) {
log.info("Usage: ExecutorMain <task.json> <status.json>");
System.exit(2);
}
final ExecutorNode node = ExecutorNode.builder().build();
final Lifecycle lifecycle = new Lifecycle();
lifecycle.addManagedInstance(node);
try {
lifecycle.start();
}
catch (Throwable t) {
log.info(t, "Throwable caught at startup, committing seppuku");
System.exit(2);
}
try {
final Task task = node.getJsonMapper().readValue(new File(args[0]), Task.class);
log.info(
"Running with task: %s",
node.getJsonMapper().writerWithDefaultPrettyPrinter().writeValueAsString(task)
);
final TaskStatus status = node.run(task).get();
log.info(
"Task completed with status: %s",
node.getJsonMapper().writerWithDefaultPrettyPrinter().writeValueAsString(status)
);
node.getJsonMapper().writeValue(new File(args[1]), status);
} finally {
lifecycle.stop();
}
// TODO maybe this shouldn't be needed?
System.exit(0);
}
}

View File

@ -0,0 +1,514 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.merger.worker.executor;
import com.fasterxml.jackson.databind.InjectableValues;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.smile.SmileFactory;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.metamx.common.ISE;
import com.metamx.common.concurrent.ScheduledExecutorFactory;
import com.metamx.common.concurrent.ScheduledExecutors;
import com.metamx.common.config.Config;
import com.metamx.common.lifecycle.Lifecycle;
import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.druid.BaseServerNode;
import com.metamx.druid.client.ClientConfig;
import com.metamx.druid.client.ClientInventoryManager;
import com.metamx.druid.client.MutableServerView;
import com.metamx.druid.client.OnlyNewSegmentWatcherServerView;
import com.metamx.druid.http.QueryServlet;
import com.metamx.druid.http.StatusServlet;
import com.metamx.druid.initialization.CuratorConfig;
import com.metamx.druid.initialization.Initialization;
import com.metamx.druid.initialization.ServerConfig;
import com.metamx.druid.initialization.ServerInit;
import com.metamx.druid.initialization.ServiceDiscoveryConfig;
import com.metamx.druid.jackson.DefaultObjectMapper;
import com.metamx.druid.loading.DataSegmentKiller;
import com.metamx.druid.loading.DataSegmentPusher;
import com.metamx.druid.loading.S3DataSegmentKiller;
import com.metamx.druid.merger.common.RetryPolicyFactory;
import com.metamx.druid.merger.common.TaskStatus;
import com.metamx.druid.merger.common.TaskToolboxFactory;
import com.metamx.druid.merger.common.actions.RemoteTaskActionClientFactory;
import com.metamx.druid.merger.common.config.RetryPolicyConfig;
import com.metamx.druid.merger.common.config.TaskConfig;
import com.metamx.druid.merger.common.index.StaticS3FirehoseFactory;
import com.metamx.druid.merger.common.task.Task;
import com.metamx.druid.merger.coordinator.ExecutorServiceTaskRunner;
import com.metamx.druid.merger.worker.config.WorkerConfig;
import com.metamx.druid.realtime.SegmentAnnouncer;
import com.metamx.druid.realtime.ZkSegmentAnnouncer;
import com.metamx.druid.realtime.ZkSegmentAnnouncerConfig;
import com.metamx.druid.utils.PropUtils;
import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.core.Emitters;
import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.http.client.HttpClient;
import com.metamx.http.client.HttpClientConfig;
import com.metamx.http.client.HttpClientInit;
import com.metamx.metrics.JvmMonitor;
import com.metamx.metrics.Monitor;
import com.metamx.metrics.MonitorScheduler;
import com.metamx.metrics.MonitorSchedulerConfig;
import com.metamx.metrics.SysMonitor;
import com.netflix.curator.framework.CuratorFramework;
import com.netflix.curator.x.discovery.ServiceDiscovery;
import com.netflix.curator.x.discovery.ServiceProvider;
import org.jets3t.service.S3ServiceException;
import org.jets3t.service.impl.rest.httpclient.RestS3Service;
import org.jets3t.service.security.AWSCredentials;
import org.mortbay.jetty.Server;
import org.mortbay.jetty.servlet.Context;
import org.mortbay.jetty.servlet.ServletHolder;
import org.skife.config.ConfigurationObjectFactory;
import java.io.IOException;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
/**
*/
public class ExecutorNode extends BaseServerNode<ExecutorNode>
{
private static final EmittingLogger log = new EmittingLogger(ExecutorNode.class);
public static Builder builder()
{
return new Builder();
}
private final Lifecycle lifecycle;
private final Properties props;
private final ConfigurationObjectFactory configFactory;
private RestS3Service s3Service = null;
private List<Monitor> monitors = null;
private HttpClient httpClient = null;
private ServiceEmitter emitter = null;
private TaskConfig taskConfig = null;
private WorkerConfig workerConfig = null;
private DataSegmentPusher segmentPusher = null;
private TaskToolboxFactory taskToolboxFactory = null;
private CuratorFramework curatorFramework = null;
private ServiceDiscovery serviceDiscovery = null;
private ServiceProvider coordinatorServiceProvider = null;
private MutableServerView newSegmentServerView = null;
private Server server = null;
private ExecutorServiceTaskRunner taskRunner = null;
public ExecutorNode(
Properties props,
Lifecycle lifecycle,
ObjectMapper jsonMapper,
ObjectMapper smileMapper,
ConfigurationObjectFactory configFactory
)
{
super(log, props, lifecycle, jsonMapper, smileMapper, configFactory);
this.lifecycle = lifecycle;
this.props = props;
this.configFactory = configFactory;
}
public ExecutorNode setHttpClient(HttpClient httpClient)
{
this.httpClient = httpClient;
return this;
}
public ExecutorNode setEmitter(ServiceEmitter emitter)
{
this.emitter = emitter;
return this;
}
public ExecutorNode setS3Service(RestS3Service s3Service)
{
this.s3Service = s3Service;
return this;
}
public ExecutorNode setSegmentPusher(DataSegmentPusher segmentPusher)
{
this.segmentPusher = segmentPusher;
return this;
}
public ExecutorNode setTaskToolboxFactory(TaskToolboxFactory taskToolboxFactory)
{
this.taskToolboxFactory = taskToolboxFactory;
return this;
}
public ExecutorNode setCuratorFramework(CuratorFramework curatorFramework)
{
this.curatorFramework = curatorFramework;
return this;
}
public ExecutorNode setCoordinatorServiceProvider(ServiceProvider coordinatorServiceProvider)
{
this.coordinatorServiceProvider = coordinatorServiceProvider;
return this;
}
public ExecutorNode setServiceDiscovery(ServiceDiscovery serviceDiscovery)
{
this.serviceDiscovery = serviceDiscovery;
return this;
}
public ExecutorNode setNewSegmentServerView(MutableServerView newSegmentServerView)
{
this.newSegmentServerView = newSegmentServerView;
return this;
}
@Override
public void doInit() throws Exception
{
initializeHttpClient();
initializeEmitter();
initializeS3Service();
initializeMonitors();
initializeMergerConfig();
initializeCuratorFramework();
initializeServiceDiscovery();
initializeCoordinatorServiceProvider();
initializeNewSegmentServerView();
initializeDataSegmentPusher();
initializeTaskToolbox();
initializeTaskRunner();
initializeJacksonInjections();
initializeJacksonSubtypes();
initializeServer();
final ScheduledExecutorFactory scheduledExecutorFactory = ScheduledExecutors.createFactory(lifecycle);
final ScheduledExecutorService globalScheduledExec = scheduledExecutorFactory.create(1, "Global--%d");
final MonitorScheduler monitorScheduler = new MonitorScheduler(
configFactory.build(MonitorSchedulerConfig.class),
globalScheduledExec,
emitter,
monitors
);
lifecycle.addManagedInstance(monitorScheduler);
final Context root = new Context(server, "/", Context.SESSIONS);
root.addServlet(new ServletHolder(new StatusServlet()), "/status");
root.addServlet(
new ServletHolder(
new QueryServlet(getJsonMapper(), getSmileMapper(), taskRunner, emitter, getRequestLogger())
),
"/druid/v2/*"
);
}
@LifecycleStart
public synchronized void start() throws Exception
{
init();
lifecycle.start();
}
@LifecycleStop
public synchronized void stop()
{
lifecycle.stop();
}
public synchronized ListenableFuture<TaskStatus> run(Task task)
{
return taskRunner.run(task);
}
private void initializeServer()
{
if (server == null) {
server = Initialization.makeJettyServer(configFactory.build(ServerConfig.class));
lifecycle.addHandler(
new Lifecycle.Handler()
{
@Override
public void start() throws Exception
{
log.info("Starting Jetty");
server.start();
}
@Override
public void stop()
{
log.info("Stopping Jetty");
try {
server.stop();
}
catch (Exception e) {
log.error(e, "Exception thrown while stopping Jetty");
}
}
}
);
}
}
private void initializeJacksonInjections()
{
InjectableValues.Std injectables = new InjectableValues.Std();
injectables.addValue("s3Client", s3Service)
.addValue("segmentPusher", segmentPusher);
getJsonMapper().setInjectableValues(injectables);
}
private void initializeJacksonSubtypes()
{
getJsonMapper().registerSubtypes(StaticS3FirehoseFactory.class);
}
private void initializeHttpClient()
{
if (httpClient == null) {
httpClient = HttpClientInit.createClient(
HttpClientConfig.builder().withNumConnections(1).build(), lifecycle
);
}
}
private void initializeEmitter()
{
if (emitter == null) {
emitter = new ServiceEmitter(
PropUtils.getProperty(props, "druid.service"),
PropUtils.getProperty(props, "druid.host"),
Emitters.create(props, httpClient, getJsonMapper(), lifecycle)
);
}
EmittingLogger.registerEmitter(emitter);
}
private void initializeS3Service() throws S3ServiceException
{
if(s3Service == null) {
s3Service = new RestS3Service(
new AWSCredentials(
PropUtils.getProperty(props, "com.metamx.aws.accessKey"),
PropUtils.getProperty(props, "com.metamx.aws.secretKey")
)
);
}
}
private void initializeMonitors()
{
if (monitors == null) {
monitors = Lists.newArrayList();
monitors.add(new JvmMonitor());
monitors.add(new SysMonitor());
}
}
private void initializeMergerConfig()
{
if (taskConfig == null) {
taskConfig = configFactory.build(TaskConfig.class);
}
if (workerConfig == null) {
workerConfig = configFactory.build(WorkerConfig.class);
}
}
public void initializeDataSegmentPusher()
{
if (segmentPusher == null) {
segmentPusher = ServerInit.getSegmentPusher(props, configFactory, getJsonMapper());
}
}
public void initializeTaskToolbox() throws S3ServiceException
{
if (taskToolboxFactory == null) {
final DataSegmentKiller dataSegmentKiller = new S3DataSegmentKiller(s3Service);
final SegmentAnnouncer segmentAnnouncer = new ZkSegmentAnnouncer(
configFactory.build(ZkSegmentAnnouncerConfig.class),
getPhoneBook()
);
lifecycle.addManagedInstance(segmentAnnouncer);
taskToolboxFactory = new TaskToolboxFactory(
taskConfig,
new RemoteTaskActionClientFactory(
httpClient,
coordinatorServiceProvider,
new RetryPolicyFactory(
configFactory.buildWithReplacements(
RetryPolicyConfig.class,
ImmutableMap.of("base_path", "druid.worker.taskActionClient")
)
),
getJsonMapper()
),
emitter,
s3Service,
segmentPusher,
dataSegmentKiller,
segmentAnnouncer,
newSegmentServerView,
getConglomerate(),
getJsonMapper()
);
}
}
public void initializeCuratorFramework() throws IOException
{
if (curatorFramework == null) {
final CuratorConfig curatorConfig = configFactory.build(CuratorConfig.class);
curatorFramework = Initialization.makeCuratorFrameworkClient(
curatorConfig,
lifecycle
);
}
}
public void initializeServiceDiscovery() throws Exception
{
if (serviceDiscovery == null) {
final ServiceDiscoveryConfig config = configFactory.build(ServiceDiscoveryConfig.class);
this.serviceDiscovery = Initialization.makeServiceDiscoveryClient(
curatorFramework,
config,
lifecycle
);
}
}
public void initializeCoordinatorServiceProvider()
{
if (coordinatorServiceProvider == null) {
this.coordinatorServiceProvider = Initialization.makeServiceProvider(
workerConfig.getMasterService(),
serviceDiscovery,
lifecycle
);
}
}
private void initializeNewSegmentServerView()
{
if (newSegmentServerView == null) {
final MutableServerView view = new OnlyNewSegmentWatcherServerView();
final ClientInventoryManager clientInventoryManager = new ClientInventoryManager(
getConfigFactory().build(ClientConfig.class),
getPhoneBook(),
view
);
lifecycle.addManagedInstance(clientInventoryManager);
this.newSegmentServerView = view;
}
}
public void initializeTaskRunner()
{
if (taskRunner == null) {
final ExecutorServiceTaskRunner taskRunner = new ExecutorServiceTaskRunner(
taskToolboxFactory,
Executors.newSingleThreadExecutor(
new ThreadFactoryBuilder()
.setNameFormat("task-runner-%d")
.build()
)
);
this.taskRunner = taskRunner;
}
}
public static class Builder
{
private ObjectMapper jsonMapper = null;
private ObjectMapper smileMapper = null;
private Lifecycle lifecycle = null;
private Properties props = null;
private ConfigurationObjectFactory configFactory = null;
public Builder withMapper(ObjectMapper jsonMapper)
{
this.jsonMapper = jsonMapper;
return this;
}
public Builder withLifecycle(Lifecycle lifecycle)
{
this.lifecycle = lifecycle;
return this;
}
public Builder withProps(Properties props)
{
this.props = props;
return this;
}
public Builder withConfigFactory(ConfigurationObjectFactory configFactory)
{
this.configFactory = configFactory;
return this;
}
public ExecutorNode build()
{
if (jsonMapper == null && smileMapper == null) {
jsonMapper = new DefaultObjectMapper();
smileMapper = new DefaultObjectMapper(new SmileFactory());
smileMapper.getJsonFactory().setCodec(smileMapper);
}
else if (jsonMapper == null || smileMapper == null) {
throw new ISE("Only jsonMapper[%s] or smileMapper[%s] was set, must set neither or both.", jsonMapper, smileMapper);
}
if (lifecycle == null) {
lifecycle = new Lifecycle();
}
if (props == null) {
props = Initialization.loadProperties();
}
if (configFactory == null) {
configFactory = Config.createFactory(props);
}
return new ExecutorNode(props, lifecycle, jsonMapper, smileMapper, configFactory);
}
}
}

View File

@ -19,49 +19,36 @@
package com.metamx.druid.merger.worker.http; package com.metamx.druid.merger.worker.http;
import com.fasterxml.jackson.databind.InjectableValues;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.smile.SmileFactory; import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.inject.servlet.GuiceFilter; import com.google.inject.Guice;
import com.metamx.common.ISE; import com.google.inject.Injector;
import com.metamx.common.concurrent.ScheduledExecutorFactory; import com.metamx.common.concurrent.ScheduledExecutorFactory;
import com.metamx.common.concurrent.ScheduledExecutors; import com.metamx.common.concurrent.ScheduledExecutors;
import com.metamx.common.config.Config; import com.metamx.common.config.Config;
import com.metamx.common.lifecycle.Lifecycle; import com.metamx.common.lifecycle.Lifecycle;
import com.metamx.common.lifecycle.LifecycleStart; import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop; import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.druid.BaseServerNode; import com.metamx.druid.RegisteringNode;
import com.metamx.druid.client.ClientConfig; import com.metamx.druid.http.GuiceServletConfig;
import com.metamx.druid.client.ClientInventoryManager;
import com.metamx.druid.client.MutableServerView;
import com.metamx.druid.client.OnlyNewSegmentWatcherServerView;
import com.metamx.druid.http.QueryServlet;
import com.metamx.druid.http.StatusServlet; import com.metamx.druid.http.StatusServlet;
import com.metamx.druid.initialization.CuratorConfig; import com.metamx.druid.initialization.CuratorConfig;
import com.metamx.druid.initialization.Initialization; import com.metamx.druid.initialization.Initialization;
import com.metamx.druid.initialization.ServerConfig; import com.metamx.druid.initialization.ServerConfig;
import com.metamx.druid.initialization.ServerInit;
import com.metamx.druid.initialization.ServiceDiscoveryConfig; import com.metamx.druid.initialization.ServiceDiscoveryConfig;
import com.metamx.druid.jackson.DefaultObjectMapper; import com.metamx.druid.jackson.DefaultObjectMapper;
import com.metamx.druid.loading.DataSegmentKiller;
import com.metamx.druid.loading.DataSegmentPusher;
import com.metamx.druid.loading.S3DataSegmentKiller;
import com.metamx.druid.merger.common.RetryPolicyFactory;
import com.metamx.druid.merger.common.TaskToolboxFactory;
import com.metamx.druid.merger.common.actions.RemoteTaskActionClientFactory;
import com.metamx.druid.merger.common.config.IndexerZkConfig; import com.metamx.druid.merger.common.config.IndexerZkConfig;
import com.metamx.druid.merger.common.config.RetryPolicyConfig;
import com.metamx.druid.merger.common.config.TaskConfig; import com.metamx.druid.merger.common.config.TaskConfig;
import com.metamx.druid.merger.common.index.StaticS3FirehoseFactory; import com.metamx.druid.merger.common.index.StaticS3FirehoseFactory;
import com.metamx.druid.merger.coordinator.ForkingTaskRunner;
import com.metamx.druid.merger.coordinator.TaskStorageQueryAdapter;
import com.metamx.druid.merger.coordinator.config.ForkingTaskRunnerConfig;
import com.metamx.druid.merger.coordinator.http.IndexerCoordinatorServletModule;
import com.metamx.druid.merger.worker.Worker; import com.metamx.druid.merger.worker.Worker;
import com.metamx.druid.merger.worker.WorkerCuratorCoordinator; import com.metamx.druid.merger.worker.WorkerCuratorCoordinator;
import com.metamx.druid.merger.worker.WorkerTaskMonitor; import com.metamx.druid.merger.worker.WorkerTaskMonitor;
import com.metamx.druid.merger.worker.config.WorkerConfig; import com.metamx.druid.merger.worker.config.WorkerConfig;
import com.metamx.druid.realtime.SegmentAnnouncer;
import com.metamx.druid.realtime.ZkSegmentAnnouncer;
import com.metamx.druid.realtime.ZkSegmentAnnouncerConfig;
import com.metamx.druid.utils.PropUtils; import com.metamx.druid.utils.PropUtils;
import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.core.Emitters; import com.metamx.emitter.core.Emitters;
@ -78,12 +65,8 @@ import com.netflix.curator.framework.CuratorFramework;
import com.netflix.curator.framework.recipes.cache.PathChildrenCache; import com.netflix.curator.framework.recipes.cache.PathChildrenCache;
import com.netflix.curator.x.discovery.ServiceDiscovery; import com.netflix.curator.x.discovery.ServiceDiscovery;
import com.netflix.curator.x.discovery.ServiceProvider; import com.netflix.curator.x.discovery.ServiceProvider;
import org.jets3t.service.S3ServiceException;
import org.jets3t.service.impl.rest.httpclient.RestS3Service;
import org.jets3t.service.security.AWSCredentials;
import org.mortbay.jetty.Server; import org.mortbay.jetty.Server;
import org.mortbay.jetty.servlet.Context; import org.mortbay.jetty.servlet.Context;
import org.mortbay.jetty.servlet.DefaultServlet;
import org.mortbay.jetty.servlet.ServletHolder; import org.mortbay.jetty.servlet.ServletHolder;
import org.skife.config.ConfigurationObjectFactory; import org.skife.config.ConfigurationObjectFactory;
@ -96,7 +79,7 @@ import java.util.concurrent.ScheduledExecutorService;
/** /**
*/ */
public class WorkerNode extends BaseServerNode<WorkerNode> public class WorkerNode extends RegisteringNode
{ {
private static final EmittingLogger log = new EmittingLogger(WorkerNode.class); private static final EmittingLogger log = new EmittingLogger(WorkerNode.class);
@ -107,22 +90,20 @@ public class WorkerNode extends BaseServerNode<WorkerNode>
private final Lifecycle lifecycle; private final Lifecycle lifecycle;
private final Properties props; private final Properties props;
private final ObjectMapper jsonMapper;
private final ConfigurationObjectFactory configFactory; private final ConfigurationObjectFactory configFactory;
private RestS3Service s3Service = null;
private List<Monitor> monitors = null; private List<Monitor> monitors = null;
private HttpClient httpClient = null; private HttpClient httpClient = null;
private ServiceEmitter emitter = null; private ServiceEmitter emitter = null;
private TaskConfig taskConfig = null; private TaskConfig taskConfig = null;
private WorkerConfig workerConfig = null; private WorkerConfig workerConfig = null;
private DataSegmentPusher segmentPusher = null;
private TaskToolboxFactory taskToolboxFactory = null;
private CuratorFramework curatorFramework = null; private CuratorFramework curatorFramework = null;
private ServiceDiscovery serviceDiscovery = null; private ServiceDiscovery serviceDiscovery = null;
private ServiceProvider coordinatorServiceProvider = null; private ServiceProvider coordinatorServiceProvider = null;
private WorkerCuratorCoordinator workerCuratorCoordinator = null; private WorkerCuratorCoordinator workerCuratorCoordinator = null;
private WorkerTaskMonitor workerTaskMonitor = null; private WorkerTaskMonitor workerTaskMonitor = null;
private MutableServerView newSegmentServerView = null; private ForkingTaskRunner forkingTaskRunner = null;
private Server server = null; private Server server = null;
private boolean initialized = false; private boolean initialized = false;
@ -131,14 +112,14 @@ public class WorkerNode extends BaseServerNode<WorkerNode>
Properties props, Properties props,
Lifecycle lifecycle, Lifecycle lifecycle,
ObjectMapper jsonMapper, ObjectMapper jsonMapper,
ObjectMapper smileMapper,
ConfigurationObjectFactory configFactory ConfigurationObjectFactory configFactory
) )
{ {
super(log, props, lifecycle, jsonMapper, smileMapper, configFactory); super(ImmutableList.of(jsonMapper));
this.lifecycle = lifecycle; this.lifecycle = lifecycle;
this.props = props; this.props = props;
this.jsonMapper = jsonMapper;
this.configFactory = configFactory; this.configFactory = configFactory;
} }
@ -154,24 +135,6 @@ public class WorkerNode extends BaseServerNode<WorkerNode>
return this; return this;
} }
public WorkerNode setS3Service(RestS3Service s3Service)
{
this.s3Service = s3Service;
return this;
}
public WorkerNode setSegmentPusher(DataSegmentPusher segmentPusher)
{
this.segmentPusher = segmentPusher;
return this;
}
public WorkerNode setTaskToolboxFactory(TaskToolboxFactory taskToolboxFactory)
{
this.taskToolboxFactory = taskToolboxFactory;
return this;
}
public WorkerNode setCuratorFramework(CuratorFramework curatorFramework) public WorkerNode setCuratorFramework(CuratorFramework curatorFramework)
{ {
this.curatorFramework = curatorFramework; this.curatorFramework = curatorFramework;
@ -196,9 +159,9 @@ public class WorkerNode extends BaseServerNode<WorkerNode>
return this; return this;
} }
public WorkerNode setNewSegmentServerView(MutableServerView newSegmentServerView) public WorkerNode setForkingTaskRunner(ForkingTaskRunner forkingTaskRunner)
{ {
this.newSegmentServerView = newSegmentServerView; this.forkingTaskRunner = forkingTaskRunner;
return this; return this;
} }
@ -208,23 +171,18 @@ public class WorkerNode extends BaseServerNode<WorkerNode>
return this; return this;
} }
@Override
public void doInit() throws Exception public void doInit() throws Exception
{ {
initializeHttpClient(); initializeHttpClient();
initializeEmitter(); initializeEmitter();
initializeS3Service();
initializeMonitors(); initializeMonitors();
initializeMergerConfig(); initializeMergerConfig();
initializeCuratorFramework(); initializeCuratorFramework();
initializeServiceDiscovery(); initializeServiceDiscovery();
initializeCoordinatorServiceProvider(); initializeCoordinatorServiceProvider();
initializeNewSegmentServerView();
initializeDataSegmentPusher();
initializeTaskToolbox();
initializeJacksonInjections();
initializeJacksonSubtypes(); initializeJacksonSubtypes();
initializeCuratorCoordinator(); initializeCuratorCoordinator();
initializeTaskRunner();
initializeWorkerTaskMonitor(); initializeWorkerTaskMonitor();
initializeServer(); initializeServer();
@ -238,24 +196,24 @@ public class WorkerNode extends BaseServerNode<WorkerNode>
); );
lifecycle.addManagedInstance(monitorScheduler); lifecycle.addManagedInstance(monitorScheduler);
final Injector injector = Guice.createInjector(
new WorkerServletModule(
getJsonMapper(),
emitter,
forkingTaskRunner
)
);
final Context root = new Context(server, "/", Context.SESSIONS); final Context root = new Context(server, "/", Context.SESSIONS);
root.addServlet(new ServletHolder(new StatusServlet()), "/status"); root.addServlet(new ServletHolder(new StatusServlet()), "/status");
root.addServlet(new ServletHolder(new DefaultServlet()), "/mmx/*"); root.addEventListener(new GuiceServletConfig(injector));
root.addServlet(
new ServletHolder(
new QueryServlet(getJsonMapper(), getSmileMapper(), workerTaskMonitor, emitter, getRequestLogger())
),
"/druid/v2/*"
);
root.addFilter(GuiceFilter.class, "/mmx/indexer/worker/v1/*", 0);
} }
@LifecycleStart @LifecycleStart
public synchronized void start() throws Exception public synchronized void start() throws Exception
{ {
if (!initialized) { if (!initialized) {
init(); doInit();
} }
lifecycle.start(); lifecycle.start();
@ -267,6 +225,11 @@ public class WorkerNode extends BaseServerNode<WorkerNode>
lifecycle.stop(); lifecycle.stop();
} }
private ObjectMapper getJsonMapper()
{
return jsonMapper;
}
private void initializeServer() private void initializeServer()
{ {
if (server == null) { if (server == null) {
@ -298,16 +261,6 @@ public class WorkerNode extends BaseServerNode<WorkerNode>
} }
} }
private void initializeJacksonInjections()
{
InjectableValues.Std injectables = new InjectableValues.Std();
injectables.addValue("s3Client", s3Service)
.addValue("segmentPusher", segmentPusher);
getJsonMapper().setInjectableValues(injectables);
}
private void initializeJacksonSubtypes() private void initializeJacksonSubtypes()
{ {
getJsonMapper().registerSubtypes(StaticS3FirehoseFactory.class); getJsonMapper().registerSubtypes(StaticS3FirehoseFactory.class);
@ -334,18 +287,6 @@ public class WorkerNode extends BaseServerNode<WorkerNode>
EmittingLogger.registerEmitter(emitter); EmittingLogger.registerEmitter(emitter);
} }
private void initializeS3Service() throws S3ServiceException
{
if(s3Service == null) {
s3Service = new RestS3Service(
new AWSCredentials(
PropUtils.getProperty(props, "com.metamx.aws.accessKey"),
PropUtils.getProperty(props, "com.metamx.aws.secretKey")
)
);
}
}
private void initializeMonitors() private void initializeMonitors()
{ {
if (monitors == null) { if (monitors == null) {
@ -366,47 +307,6 @@ public class WorkerNode extends BaseServerNode<WorkerNode>
} }
} }
public void initializeDataSegmentPusher()
{
if (segmentPusher == null) {
segmentPusher = ServerInit.getSegmentPusher(props, configFactory, getJsonMapper());
}
}
public void initializeTaskToolbox() throws S3ServiceException
{
if (taskToolboxFactory == null) {
final DataSegmentKiller dataSegmentKiller = new S3DataSegmentKiller(s3Service);
final SegmentAnnouncer segmentAnnouncer = new ZkSegmentAnnouncer(
configFactory.build(ZkSegmentAnnouncerConfig.class),
getPhoneBook()
);
lifecycle.addManagedInstance(segmentAnnouncer);
taskToolboxFactory = new TaskToolboxFactory(
taskConfig,
new RemoteTaskActionClientFactory(
httpClient,
coordinatorServiceProvider,
new RetryPolicyFactory(
configFactory.buildWithReplacements(
RetryPolicyConfig.class,
ImmutableMap.of("base_path", "druid.worker.taskActionClient")
)
),
getJsonMapper()
),
emitter,
s3Service,
segmentPusher,
dataSegmentKiller,
segmentAnnouncer,
newSegmentServerView,
getConglomerate(),
getJsonMapper()
);
}
}
public void initializeCuratorFramework() throws IOException public void initializeCuratorFramework() throws IOException
{ {
if (curatorFramework == null) { if (curatorFramework == null) {
@ -454,18 +354,14 @@ public class WorkerNode extends BaseServerNode<WorkerNode>
} }
} }
private void initializeNewSegmentServerView() public void initializeTaskRunner()
{ {
if (newSegmentServerView == null) { if (forkingTaskRunner == null) {
final MutableServerView view = new OnlyNewSegmentWatcherServerView(); forkingTaskRunner = new ForkingTaskRunner(
final ClientInventoryManager clientInventoryManager = new ClientInventoryManager( configFactory.build(ForkingTaskRunnerConfig.class),
getConfigFactory().build(ClientConfig.class), Executors.newFixedThreadPool(workerConfig.getCapacity()),
getPhoneBook(), getJsonMapper()
view
); );
lifecycle.addManagedInstance(clientInventoryManager);
this.newSegmentServerView = view;
} }
} }
@ -479,10 +375,11 @@ public class WorkerNode extends BaseServerNode<WorkerNode>
false false
); );
workerTaskMonitor = new WorkerTaskMonitor( workerTaskMonitor = new WorkerTaskMonitor(
getJsonMapper(),
pathChildrenCache, pathChildrenCache,
curatorFramework, curatorFramework,
workerCuratorCoordinator, workerCuratorCoordinator,
taskToolboxFactory, forkingTaskRunner,
workerExec workerExec
); );
lifecycle.addManagedInstance(workerTaskMonitor); lifecycle.addManagedInstance(workerTaskMonitor);
@ -492,7 +389,6 @@ public class WorkerNode extends BaseServerNode<WorkerNode>
public static class Builder public static class Builder
{ {
private ObjectMapper jsonMapper = null; private ObjectMapper jsonMapper = null;
private ObjectMapper smileMapper = null;
private Lifecycle lifecycle = null; private Lifecycle lifecycle = null;
private Properties props = null; private Properties props = null;
private ConfigurationObjectFactory configFactory = null; private ConfigurationObjectFactory configFactory = null;
@ -523,13 +419,8 @@ public class WorkerNode extends BaseServerNode<WorkerNode>
public WorkerNode build() public WorkerNode build()
{ {
if (jsonMapper == null && smileMapper == null) { if (jsonMapper == null) {
jsonMapper = new DefaultObjectMapper(); jsonMapper = new DefaultObjectMapper();
smileMapper = new DefaultObjectMapper(new SmileFactory());
smileMapper.getJsonFactory().setCodec(smileMapper);
}
else if (jsonMapper == null || smileMapper == null) {
throw new ISE("Only jsonMapper[%s] or smileMapper[%s] was set, must set neither or both.", jsonMapper, smileMapper);
} }
if (lifecycle == null) { if (lifecycle == null) {
@ -544,7 +435,7 @@ public class WorkerNode extends BaseServerNode<WorkerNode>
configFactory = Config.createFactory(props); configFactory = Config.createFactory(props);
} }
return new WorkerNode(props, lifecycle, jsonMapper, smileMapper, configFactory); return new WorkerNode(props, lifecycle, jsonMapper, configFactory);
} }
} }
} }

View File

@ -3,7 +3,7 @@ package com.metamx.druid.merger.worker.http;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.inject.Inject; import com.google.inject.Inject;
import com.metamx.common.logger.Logger; import com.metamx.common.logger.Logger;
import com.metamx.druid.merger.common.control.TaskControl; import com.metamx.druid.merger.coordinator.TaskRunner;
import javax.ws.rs.Consumes; import javax.ws.rs.Consumes;
import javax.ws.rs.POST; import javax.ws.rs.POST;
@ -19,22 +19,31 @@ public class WorkerResource
private static final Logger log = new Logger(WorkerResource.class); private static final Logger log = new Logger(WorkerResource.class);
private final ObjectMapper jsonMapper; private final ObjectMapper jsonMapper;
private final TaskRunner taskRunner;
@Inject @Inject
public WorkerResource( public WorkerResource(
ObjectMapper jsonMapper ObjectMapper jsonMapper,
TaskRunner taskRunner
) throws Exception ) throws Exception
{ {
this.jsonMapper = jsonMapper; this.jsonMapper = jsonMapper;
this.taskRunner = taskRunner;
} }
@POST @POST
@Path("/control") @Path("/shutdown")
@Consumes("application/json") @Consumes("application/json")
@Produces("application/json") @Produces("application/json")
public Response doControl(final TaskControl control) public Response doShutdown(final String taskId)
{ {
// TODO try {
taskRunner.shutdown(taskId);
}
catch (Exception e) {
return Response.serverError().build();
}
return Response.ok().build(); return Response.ok().build();
} }
} }

View File

@ -0,0 +1,52 @@
package com.metamx.druid.merger.worker.http;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;
import com.google.inject.Provides;
import com.metamx.druid.merger.coordinator.ForkingTaskRunner;
import com.metamx.druid.merger.coordinator.http.IndexerCoordinatorResource;
import com.metamx.emitter.service.ServiceEmitter;
import com.sun.jersey.guice.JerseyServletModule;
import com.sun.jersey.guice.spi.container.servlet.GuiceContainer;
import javax.inject.Singleton;
/**
*/
public class WorkerServletModule extends JerseyServletModule
{
private final ObjectMapper jsonMapper;
private final ServiceEmitter emitter;
private final ForkingTaskRunner forkingTaskRunner;
public WorkerServletModule(
ObjectMapper jsonMapper,
ServiceEmitter emitter,
ForkingTaskRunner forkingTaskRunner
)
{
this.jsonMapper = jsonMapper;
this.emitter = emitter;
this.forkingTaskRunner = forkingTaskRunner;
}
@Override
protected void configureServlets()
{
bind(IndexerCoordinatorResource.class);
bind(ObjectMapper.class).toInstance(jsonMapper);
bind(ServiceEmitter.class).toInstance(emitter);
bind(ForkingTaskRunner.class).toInstance(forkingTaskRunner);
serve("/*").with(GuiceContainer.class);
}
@Provides
@Singleton
public JacksonJsonProvider getJacksonJsonProvider()
{
final JacksonJsonProvider provider = new JacksonJsonProvider();
provider.setMapper(jsonMapper);
return provider;
}
}

View File

@ -4,19 +4,20 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.jsontype.NamedType; import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.google.common.base.Throwables; import com.google.common.base.Throwables;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.metamx.common.ISE; import com.metamx.common.ISE;
import com.metamx.druid.aggregation.AggregatorFactory; import com.metamx.druid.aggregation.AggregatorFactory;
import com.metamx.druid.client.DataSegment; import com.metamx.druid.client.DataSegment;
import com.metamx.druid.jackson.DefaultObjectMapper; import com.metamx.druid.jackson.DefaultObjectMapper;
import com.metamx.druid.merger.TestTask; import com.metamx.druid.merger.TestTask;
import com.metamx.druid.merger.common.RetryPolicyFactory; import com.metamx.druid.merger.common.RetryPolicyFactory;
import com.metamx.druid.merger.common.TaskCallback;
import com.metamx.druid.merger.common.TaskStatus; import com.metamx.druid.merger.common.TaskStatus;
import com.metamx.druid.merger.common.TaskToolboxFactory; import com.metamx.druid.merger.common.TaskToolboxFactory;
import com.metamx.druid.merger.common.config.IndexerZkConfig; import com.metamx.druid.merger.common.config.IndexerZkConfig;
import com.metamx.druid.merger.common.config.RetryPolicyConfig;
import com.metamx.druid.merger.common.config.TaskConfig; import com.metamx.druid.merger.common.config.TaskConfig;
import com.metamx.druid.merger.coordinator.config.RemoteTaskRunnerConfig; import com.metamx.druid.merger.coordinator.config.RemoteTaskRunnerConfig;
import com.metamx.druid.merger.common.config.RetryPolicyConfig;
import com.metamx.druid.merger.coordinator.setup.WorkerSetupData; import com.metamx.druid.merger.coordinator.setup.WorkerSetupData;
import com.metamx.druid.merger.worker.Worker; import com.metamx.druid.merger.worker.Worker;
import com.metamx.druid.merger.worker.WorkerCuratorCoordinator; import com.metamx.druid.merger.worker.WorkerCuratorCoordinator;
@ -130,10 +131,7 @@ public class RemoteTaskRunnerTest
@Test @Test
public void testRunNoExistingTask() throws Exception public void testRunNoExistingTask() throws Exception
{ {
remoteTaskRunner.run( remoteTaskRunner.run(task1);
task1,
null
);
} }
@Test @Test
@ -146,11 +144,10 @@ public class RemoteTaskRunnerTest
task1.getSegments(), task1.getSegments(),
Lists.<AggregatorFactory>newArrayList(), Lists.<AggregatorFactory>newArrayList(),
TaskStatus.running(task1.getId()) TaskStatus.running(task1.getId())
), )
null
); );
try { try {
remoteTaskRunner.run(task1, null); remoteTaskRunner.run(task1);
fail("ISE expected"); fail("ISE expected");
} }
catch (ISE expected) { catch (ISE expected) {
@ -182,8 +179,7 @@ public class RemoteTaskRunnerTest
), ),
Lists.<AggregatorFactory>newArrayList(), Lists.<AggregatorFactory>newArrayList(),
TaskStatus.success("foo") TaskStatus.success("foo")
), )
null
); );
EasyMock.verify(emitter); EasyMock.verify(emitter);
} }
@ -192,22 +188,30 @@ public class RemoteTaskRunnerTest
public void testRunWithCallback() throws Exception public void testRunWithCallback() throws Exception
{ {
final MutableBoolean callbackCalled = new MutableBoolean(false); final MutableBoolean callbackCalled = new MutableBoolean(false);
remoteTaskRunner.run(
new TestTask( Futures.addCallback(
task1.getId(), remoteTaskRunner.run(
task1.getDataSource(), new TestTask(
task1.getSegments(), task1.getId(),
Lists.<AggregatorFactory>newArrayList(), task1.getDataSource(),
TaskStatus.running(task1.getId()) task1.getSegments(),
), Lists.<AggregatorFactory>newArrayList(),
new TaskCallback() TaskStatus.running(task1.getId())
{ )
@Override ), new FutureCallback<TaskStatus>()
public void notify(TaskStatus status) {
{ @Override
callbackCalled.setValue(true); public void onSuccess(TaskStatus taskStatus)
} {
} callbackCalled.setValue(true);
}
@Override
public void onFailure(Throwable throwable)
{
// neg
}
}
); );
// Really don't like this way of waiting for the task to appear // Really don't like this way of waiting for the task to appear
@ -277,35 +281,38 @@ public class RemoteTaskRunnerTest
workerCuratorCoordinator.start(); workerCuratorCoordinator.start();
workerTaskMonitor = new WorkerTaskMonitor( workerTaskMonitor = new WorkerTaskMonitor(
jsonMapper,
new PathChildrenCache(cf, String.format("%s/worker1", tasksPath), true), new PathChildrenCache(cf, String.format("%s/worker1", tasksPath), true),
cf, cf,
workerCuratorCoordinator, workerCuratorCoordinator,
new TaskToolboxFactory( new ExecutorServiceTaskRunner(
new TaskConfig() new TaskToolboxFactory(
{ new TaskConfig()
@Override {
public File getBaseTaskDir() @Override
{ public File getBaseTaskDir()
try { {
return File.createTempFile("billy", "yay"); try {
} return File.createTempFile("billy", "yay");
catch (Exception e) { }
throw Throwables.propagate(e); catch (Exception e) {
} throw Throwables.propagate(e);
} }
}
@Override @Override
public int getDefaultRowFlushBoundary() public int getDefaultRowFlushBoundary()
{ {
return 0; return 0;
} }
@Override @Override
public String getHadoopWorkingPath() public String getHadoopWorkingPath()
{ {
return null; return null;
} }
}, null, null, null, null, null, null, null, null, jsonMapper }, null, null, null, null, null, null, null, null, jsonMapper
), Executors.newSingleThreadExecutor()
), ),
Executors.newSingleThreadExecutor() Executors.newSingleThreadExecutor()
); );
@ -324,7 +331,8 @@ public class RemoteTaskRunnerTest
pathChildrenCache, pathChildrenCache,
scheduledExec, scheduledExec,
new RetryPolicyFactory(new TestRetryPolicyConfig()), new RetryPolicyFactory(new TestRetryPolicyConfig()),
new AtomicReference<WorkerSetupData>(new WorkerSetupData("0", 0, null, null)) new AtomicReference<WorkerSetupData>(new WorkerSetupData("0", 0, null, null)),
null
); );
// Create a single worker and wait for things for be ready // Create a single worker and wait for things for be ready

View File

@ -50,6 +50,7 @@ import com.metamx.druid.merger.common.actions.LockAcquireAction;
import com.metamx.druid.merger.common.actions.LockListAction; import com.metamx.druid.merger.common.actions.LockListAction;
import com.metamx.druid.merger.common.actions.LockReleaseAction; import com.metamx.druid.merger.common.actions.LockReleaseAction;
import com.metamx.druid.merger.common.actions.SegmentInsertAction; import com.metamx.druid.merger.common.actions.SegmentInsertAction;
import com.metamx.druid.merger.common.actions.TaskActionClientFactory;
import com.metamx.druid.merger.common.actions.TaskActionToolbox; import com.metamx.druid.merger.common.actions.TaskActionToolbox;
import com.metamx.druid.merger.common.config.TaskConfig; import com.metamx.druid.merger.common.config.TaskConfig;
import com.metamx.druid.merger.common.task.AbstractTask; import com.metamx.druid.merger.common.task.AbstractTask;
@ -87,6 +88,7 @@ public class TaskLifecycleTest
private TaskQueue tq = null; private TaskQueue tq = null;
private TaskRunner tr = null; private TaskRunner tr = null;
private MockMergerDBCoordinator mdc = null; private MockMergerDBCoordinator mdc = null;
private TaskActionClientFactory tac = null;
private TaskToolboxFactory tb = null; private TaskToolboxFactory tb = null;
private TaskConsumer tc = null; private TaskConsumer tc = null;
TaskStorageQueryAdapter tsqa = null; TaskStorageQueryAdapter tsqa = null;
@ -111,6 +113,7 @@ public class TaskLifecycleTest
tl = new TaskLockbox(ts); tl = new TaskLockbox(ts);
tq = new TaskQueue(ts, tl); tq = new TaskQueue(ts, tl);
mdc = newMockMDC(); mdc = newMockMDC();
tac = new LocalTaskActionClientFactory(ts, new TaskActionToolbox(tq, tl, mdc, newMockEmitter()));
tb = new TaskToolboxFactory( tb = new TaskToolboxFactory(
new TaskConfig() new TaskConfig()
@ -133,7 +136,7 @@ public class TaskLifecycleTest
return null; return null;
} }
}, },
new LocalTaskActionClientFactory(ts, new TaskActionToolbox(tq, tl, mdc, newMockEmitter())), tac,
newMockEmitter(), newMockEmitter(),
null, // s3 client null, // s3 client
new DataSegmentPusher() new DataSegmentPusher()
@ -158,12 +161,12 @@ public class TaskLifecycleTest
new DefaultObjectMapper() new DefaultObjectMapper()
); );
tr = new LocalTaskRunner( tr = new ExecutorServiceTaskRunner(
tb, tb,
Executors.newSingleThreadExecutor() Executors.newSingleThreadExecutor()
); );
tc = new TaskConsumer(tq, tr, tb, newMockEmitter()); tc = new TaskConsumer(tq, tr, tac, newMockEmitter());
tsqa = new TaskStorageQueryAdapter(ts); tsqa = new TaskStorageQueryAdapter(ts);
tq.start(); tq.start();

View File

@ -199,6 +199,10 @@ public class SimpleResourceManagementStrategyTest
EasyMock.expect(autoScalingStrategy.ipToIdLookup(EasyMock.<List<String>>anyObject())) EasyMock.expect(autoScalingStrategy.ipToIdLookup(EasyMock.<List<String>>anyObject()))
.andReturn(Lists.<String>newArrayList()).times(2); .andReturn(Lists.<String>newArrayList()).times(2);
EasyMock.expect(autoScalingStrategy.idToIpLookup(EasyMock.<List<String>>anyObject()))
.andReturn(Lists.<String>newArrayList());
EasyMock.expect(autoScalingStrategy.terminate(EasyMock.<List<String>>anyObject()))
.andReturn(null);
EasyMock.expect(autoScalingStrategy.provision()).andReturn( EasyMock.expect(autoScalingStrategy.provision()).andReturn(
new AutoScalingData(Lists.<String>newArrayList("fake"), Lists.newArrayList("faker")) new AutoScalingData(Lists.<String>newArrayList("fake"), Lists.newArrayList("faker"))
); );

View File

@ -23,7 +23,7 @@
<groupId>com.metamx</groupId> <groupId>com.metamx</groupId>
<artifactId>druid</artifactId> <artifactId>druid</artifactId>
<packaging>pom</packaging> <packaging>pom</packaging>
<version>0.3.24-SNAPSHOT</version> <version>0.3.26-SNAPSHOT</version>
<name>druid</name> <name>druid</name>
<description>druid</description> <description>druid</description>
<scm> <scm>

View File

@ -28,7 +28,7 @@
<parent> <parent>
<groupId>com.metamx</groupId> <groupId>com.metamx</groupId>
<artifactId>druid</artifactId> <artifactId>druid</artifactId>
<version>0.3.24-SNAPSHOT</version> <version>0.3.26-SNAPSHOT</version>
</parent> </parent>
<properties> <properties>

View File

@ -28,7 +28,7 @@
<parent> <parent>
<groupId>com.metamx</groupId> <groupId>com.metamx</groupId>
<artifactId>druid</artifactId> <artifactId>druid</artifactId>
<version>0.3.24-SNAPSHOT</version> <version>0.3.26-SNAPSHOT</version>
</parent> </parent>
<dependencies> <dependencies>

View File

@ -82,8 +82,17 @@ public class SingleSegmentLoader implements SegmentLoader
throw new SegmentLoadingException(e, "Error deleting localDir[%s]", localStorageDir); throw new SegmentLoadingException(e, "Error deleting localDir[%s]", localStorageDir);
} }
} }
final File parentDir = localStorageDir.getParentFile();
if (!parentDir.exists()) {
log.info("Parent[%s] didn't exist, creating.", parentDir);
if (!parentDir.mkdirs()) {
log.warn("Unable to make parentDir[%s]", parentDir);
}
}
legacyStorageDir.renameTo(localStorageDir); if (!legacyStorageDir.renameTo(localStorageDir)) {
log.warn("Failed moving [%s] to [%s]", legacyStorageDir, localStorageDir);
}
} }
} }