From c9b411c0ca32dd1e53f1d27ea503e40b1037fa9d Mon Sep 17 00:00:00 2001 From: Eric Tschetter Date: Mon, 4 Mar 2013 18:10:32 -0600 Subject: [PATCH] 1) Remove the need for TaskActions to require a Task as a constructor parameter --- .../merger/common/TaskToolboxFactory.java | 92 +++++++++++++++++++ .../common/actions/LocalTaskActionClient.java | 11 ++- .../actions/LocalTaskActionClientFactory.java | 43 +++++++++ .../common/actions/LockAcquireAction.java | 11 +-- .../merger/common/actions/LockListAction.java | 18 +--- .../common/actions/LockReleaseAction.java | 11 +-- .../actions/RemoteTaskActionClient.java | 7 +- .../RemoteTaskActionClientFactory.java | 47 ++++++++++ .../common/actions/SegmentInsertAction.java | 11 +-- .../actions/SegmentListUnusedAction.java | 11 +-- .../common/actions/SegmentListUsedAction.java | 11 +-- .../common/actions/SegmentNukeAction.java | 11 +-- .../common/actions/SpawnTasksAction.java | 11 +-- .../merger/common/actions/TaskAction.java | 3 +- .../actions/TaskActionClientFactory.java | 29 ++++++ .../common/actions/TaskActionHolder.java | 54 +++++++++++ .../merger/common/task/AbstractTask.java | 4 +- .../druid/merger/common/task/DeleteTask.java | 4 +- .../merger/common/task/HadoopIndexTask.java | 23 ++++- .../task/IndexDeterminePartitionsTask.java | 2 +- .../common/task/IndexGeneratorTask.java | 4 +- .../druid/merger/common/task/IndexTask.java | 2 +- .../druid/merger/common/task/KillTask.java | 6 +- .../druid/merger/common/task/MergeTask.java | 6 +- .../metamx/druid/merger/common/task/Task.java | 2 +- .../common/task/VersionConverterSubTask.java | 2 +- .../common/task/VersionConverterTask.java | 3 +- .../merger/coordinator/DbTaskStorage.java | 6 +- ...torage.java => HeapMemoryTaskStorage.java} | 8 +- .../merger/coordinator/LocalTaskRunner.java | 9 +- .../druid/merger/coordinator/TaskLockbox.java | 19 ++++ .../coordinator/TaskMasterLifecycle.java | 14 +-- .../druid/merger/coordinator/TaskStorage.java | 2 +- .../merger/coordinator/exec/TaskConsumer.java | 10 +- .../http/IndexerCoordinatorNode.java | 51 +++++----- .../http/IndexerCoordinatorResource.java | 8 +- .../druid/merger/worker/TaskMonitor.java | 10 +- .../druid/merger/worker/http/WorkerNode.java | 45 +++++---- .../coordinator/RemoteTaskRunnerTest.java | 3 +- .../merger/coordinator/TaskLifecycleTest.java | 86 ++++++++--------- .../merger/coordinator/TaskQueueTest.java | 35 +++---- pom.xml | 10 +- .../metamx/druid/realtime/RealtimeNode.java | 38 +------- .../druid/initialization/ServerInit.java | 39 ++++++++ 44 files changed, 549 insertions(+), 283 deletions(-) create mode 100644 merger/src/main/java/com/metamx/druid/merger/common/TaskToolboxFactory.java create mode 100644 merger/src/main/java/com/metamx/druid/merger/common/actions/LocalTaskActionClientFactory.java create mode 100644 merger/src/main/java/com/metamx/druid/merger/common/actions/RemoteTaskActionClientFactory.java create mode 100644 merger/src/main/java/com/metamx/druid/merger/common/actions/TaskActionClientFactory.java create mode 100644 merger/src/main/java/com/metamx/druid/merger/common/actions/TaskActionHolder.java rename merger/src/main/java/com/metamx/druid/merger/coordinator/{LocalTaskStorage.java => HeapMemoryTaskStorage.java} (95%) diff --git a/merger/src/main/java/com/metamx/druid/merger/common/TaskToolboxFactory.java b/merger/src/main/java/com/metamx/druid/merger/common/TaskToolboxFactory.java new file mode 100644 index 00000000000..9a804bef79e --- /dev/null +++ b/merger/src/main/java/com/metamx/druid/merger/common/TaskToolboxFactory.java @@ -0,0 +1,92 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package com.metamx.druid.merger.common; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.Maps; +import com.metamx.druid.client.DataSegment; +import com.metamx.druid.loading.DataSegmentPusher; +import com.metamx.druid.loading.MMappedQueryableIndexFactory; +import com.metamx.druid.loading.S3DataSegmentPuller; +import com.metamx.druid.loading.SegmentKiller; +import com.metamx.druid.loading.SegmentLoaderConfig; +import com.metamx.druid.loading.SegmentLoadingException; +import com.metamx.druid.loading.SingleSegmentLoader; +import com.metamx.druid.merger.common.actions.TaskActionClient; +import com.metamx.druid.merger.common.actions.TaskActionClientFactory; +import com.metamx.druid.merger.common.config.TaskConfig; +import com.metamx.druid.merger.common.task.Task; +import com.metamx.emitter.service.ServiceEmitter; +import org.jets3t.service.impl.rest.httpclient.RestS3Service; + +import java.io.File; +import java.util.List; +import java.util.Map; + +/** + * Stuff that may be needed by a Task in order to conduct its business. + */ +public class TaskToolboxFactory +{ + private final TaskConfig config; + private final TaskActionClientFactory taskActionClient; + private final ServiceEmitter emitter; + private final RestS3Service s3Client; + private final DataSegmentPusher segmentPusher; + private final SegmentKiller segmentKiller; + private final ObjectMapper objectMapper; + + public TaskToolboxFactory( + TaskConfig config, + TaskActionClientFactory taskActionClient, + ServiceEmitter emitter, + RestS3Service s3Client, + DataSegmentPusher segmentPusher, + SegmentKiller segmentKiller, + ObjectMapper objectMapper + ) + { + this.config = config; + this.taskActionClient = taskActionClient; + this.emitter = emitter; + this.s3Client = s3Client; + this.segmentPusher = segmentPusher; + this.segmentKiller = segmentKiller; + this.objectMapper = objectMapper; + } + + public ObjectMapper getObjectMapper() + { + return objectMapper; + } + + public TaskToolbox build(Task task) + { + return new TaskToolbox( + config, + taskActionClient == null ? null : taskActionClient.create(task), + emitter, + s3Client, + segmentPusher, + segmentKiller, + objectMapper + ); + } +} diff --git a/merger/src/main/java/com/metamx/druid/merger/common/actions/LocalTaskActionClient.java b/merger/src/main/java/com/metamx/druid/merger/common/actions/LocalTaskActionClient.java index f6740064f52..e36dbf65a6c 100644 --- a/merger/src/main/java/com/metamx/druid/merger/common/actions/LocalTaskActionClient.java +++ b/merger/src/main/java/com/metamx/druid/merger/common/actions/LocalTaskActionClient.java @@ -1,17 +1,20 @@ package com.metamx.druid.merger.common.actions; +import com.metamx.druid.merger.common.task.Task; import com.metamx.druid.merger.coordinator.TaskStorage; import com.metamx.emitter.EmittingLogger; public class LocalTaskActionClient implements TaskActionClient { + private final Task task; private final TaskStorage storage; private final TaskActionToolbox toolbox; private static final EmittingLogger log = new EmittingLogger(LocalTaskActionClient.class); - public LocalTaskActionClient(TaskStorage storage, TaskActionToolbox toolbox) + public LocalTaskActionClient(Task task, TaskStorage storage, TaskActionToolbox toolbox) { + this.task = task; this.storage = storage; this.toolbox = toolbox; } @@ -19,15 +22,15 @@ public class LocalTaskActionClient implements TaskActionClient @Override public RetType submit(TaskAction taskAction) { - final RetType ret = taskAction.perform(toolbox); + final RetType ret = taskAction.perform(task, toolbox); // Add audit log try { - storage.addAuditLog(taskAction); + storage.addAuditLog(task, taskAction); } catch (Exception e) { log.makeAlert(e, "Failed to record action in audit log") - .addData("task", taskAction.getTask().getId()) + .addData("task", task.getId()) .addData("actionClass", taskAction.getClass().getName()) .emit(); } diff --git a/merger/src/main/java/com/metamx/druid/merger/common/actions/LocalTaskActionClientFactory.java b/merger/src/main/java/com/metamx/druid/merger/common/actions/LocalTaskActionClientFactory.java new file mode 100644 index 00000000000..89e37ceafab --- /dev/null +++ b/merger/src/main/java/com/metamx/druid/merger/common/actions/LocalTaskActionClientFactory.java @@ -0,0 +1,43 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package com.metamx.druid.merger.common.actions; + +import com.metamx.druid.merger.common.task.Task; +import com.metamx.druid.merger.coordinator.TaskStorage; + +/** + */ +public class LocalTaskActionClientFactory implements TaskActionClientFactory +{ + private final TaskStorage storage; + private final TaskActionToolbox toolbox; + + public LocalTaskActionClientFactory(TaskStorage storage, TaskActionToolbox toolbox) + { + this.storage = storage; + this.toolbox = toolbox; + } + + @Override + public TaskActionClient create(Task task) + { + return new LocalTaskActionClient(task, storage, toolbox); + } +} diff --git a/merger/src/main/java/com/metamx/druid/merger/common/actions/LockAcquireAction.java b/merger/src/main/java/com/metamx/druid/merger/common/actions/LockAcquireAction.java index f669af33625..de325ba274f 100644 --- a/merger/src/main/java/com/metamx/druid/merger/common/actions/LockAcquireAction.java +++ b/merger/src/main/java/com/metamx/druid/merger/common/actions/LockAcquireAction.java @@ -11,25 +11,16 @@ import org.joda.time.Interval; public class LockAcquireAction implements TaskAction> { - private final Task task; private final Interval interval; @JsonCreator public LockAcquireAction( - @JsonProperty("task") Task task, @JsonProperty("interval") Interval interval ) { - this.task = task; this.interval = interval; } - @JsonProperty - public Task getTask() - { - return task; - } - @JsonProperty public Interval getInterval() { @@ -42,7 +33,7 @@ public class LockAcquireAction implements TaskAction> } @Override - public Optional perform(TaskActionToolbox toolbox) + public Optional perform(Task task, TaskActionToolbox toolbox) { try { return toolbox.getTaskLockbox().tryLock(task, interval); diff --git a/merger/src/main/java/com/metamx/druid/merger/common/actions/LockListAction.java b/merger/src/main/java/com/metamx/druid/merger/common/actions/LockListAction.java index e0e3eddb71f..06a2879ec47 100644 --- a/merger/src/main/java/com/metamx/druid/merger/common/actions/LockListAction.java +++ b/merger/src/main/java/com/metamx/druid/merger/common/actions/LockListAction.java @@ -12,29 +12,13 @@ import java.util.List; public class LockListAction implements TaskAction> { - private final Task task; - - @JsonCreator - public LockListAction( - @JsonProperty("task") Task task - ) - { - this.task = task; - } - - @JsonProperty - public Task getTask() - { - return task; - } - public TypeReference> getReturnTypeReference() { return new TypeReference>() {}; } @Override - public List perform(TaskActionToolbox toolbox) + public List perform(Task task, TaskActionToolbox toolbox) { try { return toolbox.getTaskLockbox().findLocksForTask(task); diff --git a/merger/src/main/java/com/metamx/druid/merger/common/actions/LockReleaseAction.java b/merger/src/main/java/com/metamx/druid/merger/common/actions/LockReleaseAction.java index 5c84d024a50..b932e748ed1 100644 --- a/merger/src/main/java/com/metamx/druid/merger/common/actions/LockReleaseAction.java +++ b/merger/src/main/java/com/metamx/druid/merger/common/actions/LockReleaseAction.java @@ -12,25 +12,16 @@ import java.util.List; public class LockReleaseAction implements TaskAction { - private final Task task; private final Interval interval; @JsonCreator public LockReleaseAction( - @JsonProperty("task") Task task, @JsonProperty("interval") Interval interval ) { - this.task = task; this.interval = interval; } - @JsonProperty - public Task getTask() - { - return task; - } - @JsonProperty public Interval getInterval() { @@ -43,7 +34,7 @@ public class LockReleaseAction implements TaskAction } @Override - public Void perform(TaskActionToolbox toolbox) + public Void perform(Task task, TaskActionToolbox toolbox) { try { toolbox.getTaskLockbox().unlock(task, interval); diff --git a/merger/src/main/java/com/metamx/druid/merger/common/actions/RemoteTaskActionClient.java b/merger/src/main/java/com/metamx/druid/merger/common/actions/RemoteTaskActionClient.java index 26900e29942..5cebc6ee1ec 100644 --- a/merger/src/main/java/com/metamx/druid/merger/common/actions/RemoteTaskActionClient.java +++ b/merger/src/main/java/com/metamx/druid/merger/common/actions/RemoteTaskActionClient.java @@ -4,6 +4,7 @@ import com.fasterxml.jackson.core.type.TypeReference; import com.google.common.base.Charsets; import com.google.common.base.Throwables; import com.metamx.common.logger.Logger; +import com.metamx.druid.merger.common.task.Task; import com.metamx.http.client.HttpClient; import com.metamx.http.client.response.ToStringResponseHandler; import com.fasterxml.jackson.databind.ObjectMapper; @@ -16,14 +17,16 @@ import java.util.Map; public class RemoteTaskActionClient implements TaskActionClient { + private final Task task; private final HttpClient httpClient; private final ServiceProvider serviceProvider; private final ObjectMapper jsonMapper; private static final Logger log = new Logger(RemoteTaskActionClient.class); - public RemoteTaskActionClient(HttpClient httpClient, ServiceProvider serviceProvider, ObjectMapper jsonMapper) + public RemoteTaskActionClient(Task task, HttpClient httpClient, ServiceProvider serviceProvider, ObjectMapper jsonMapper) { + this.task = task; this.httpClient = httpClient; this.serviceProvider = serviceProvider; this.jsonMapper = jsonMapper; @@ -33,7 +36,7 @@ public class RemoteTaskActionClient implements TaskActionClient public RetType submit(TaskAction taskAction) { try { - byte[] dataToSend = jsonMapper.writeValueAsBytes(taskAction); + byte[] dataToSend = jsonMapper.writeValueAsBytes(new TaskActionHolder(task, taskAction)); final String response = httpClient.post(getServiceUri().toURL()) .setContent("application/json", dataToSend) diff --git a/merger/src/main/java/com/metamx/druid/merger/common/actions/RemoteTaskActionClientFactory.java b/merger/src/main/java/com/metamx/druid/merger/common/actions/RemoteTaskActionClientFactory.java new file mode 100644 index 00000000000..659042bb592 --- /dev/null +++ b/merger/src/main/java/com/metamx/druid/merger/common/actions/RemoteTaskActionClientFactory.java @@ -0,0 +1,47 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package com.metamx.druid.merger.common.actions; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.metamx.druid.merger.common.task.Task; +import com.metamx.http.client.HttpClient; +import com.netflix.curator.x.discovery.ServiceProvider; + +/** + */ +public class RemoteTaskActionClientFactory implements TaskActionClientFactory +{ + private final HttpClient httpClient; + private final ServiceProvider serviceProvider; + private final ObjectMapper jsonMapper; + + public RemoteTaskActionClientFactory(HttpClient httpClient, ServiceProvider serviceProvider, ObjectMapper jsonMapper) + { + this.httpClient = httpClient; + this.serviceProvider = serviceProvider; + this.jsonMapper = jsonMapper; + } + + @Override + public TaskActionClient create(Task task) + { + return new RemoteTaskActionClient(task, httpClient, serviceProvider, jsonMapper); + } +} diff --git a/merger/src/main/java/com/metamx/druid/merger/common/actions/SegmentInsertAction.java b/merger/src/main/java/com/metamx/druid/merger/common/actions/SegmentInsertAction.java index 75ad4a9161f..5354e14878c 100644 --- a/merger/src/main/java/com/metamx/druid/merger/common/actions/SegmentInsertAction.java +++ b/merger/src/main/java/com/metamx/druid/merger/common/actions/SegmentInsertAction.java @@ -18,25 +18,16 @@ import java.util.Set; public class SegmentInsertAction implements TaskAction { - private final Task task; private final Set segments; @JsonCreator public SegmentInsertAction( - @JsonProperty("task") Task task, @JsonProperty("segments") Set segments ) { - this.task = task; this.segments = ImmutableSet.copyOf(segments); } - @JsonProperty - public Task getTask() - { - return task; - } - @JsonProperty public Set getSegments() { @@ -49,7 +40,7 @@ public class SegmentInsertAction implements TaskAction } @Override - public Void perform(TaskActionToolbox toolbox) + public Void perform(Task task, TaskActionToolbox toolbox) { if(!toolbox.taskLockCoversSegments(task, segments, false)) { throw new ISE("Segments not covered by locks for task: %s", task.getId()); diff --git a/merger/src/main/java/com/metamx/druid/merger/common/actions/SegmentListUnusedAction.java b/merger/src/main/java/com/metamx/druid/merger/common/actions/SegmentListUnusedAction.java index b20d130064e..56304533a68 100644 --- a/merger/src/main/java/com/metamx/druid/merger/common/actions/SegmentListUnusedAction.java +++ b/merger/src/main/java/com/metamx/druid/merger/common/actions/SegmentListUnusedAction.java @@ -12,28 +12,19 @@ import java.util.List; public class SegmentListUnusedAction implements TaskAction> { - private final Task task; private final String dataSource; private final Interval interval; @JsonCreator public SegmentListUnusedAction( - @JsonProperty("task") Task task, @JsonProperty("dataSource") String dataSource, @JsonProperty("interval") Interval interval ) { - this.task = task; this.dataSource = dataSource; this.interval = interval; } - @JsonProperty - public Task getTask() - { - return task; - } - @JsonProperty public String getDataSource() { @@ -52,7 +43,7 @@ public class SegmentListUnusedAction implements TaskAction> } @Override - public List perform(TaskActionToolbox toolbox) + public List perform(Task task, TaskActionToolbox toolbox) { try { return toolbox.getMergerDBCoordinator().getUnusedSegmentsForInterval(dataSource, interval); diff --git a/merger/src/main/java/com/metamx/druid/merger/common/actions/SegmentListUsedAction.java b/merger/src/main/java/com/metamx/druid/merger/common/actions/SegmentListUsedAction.java index 0395057fe83..a776ed641cc 100644 --- a/merger/src/main/java/com/metamx/druid/merger/common/actions/SegmentListUsedAction.java +++ b/merger/src/main/java/com/metamx/druid/merger/common/actions/SegmentListUsedAction.java @@ -12,28 +12,19 @@ import java.util.List; public class SegmentListUsedAction implements TaskAction> { - private final Task task; private final String dataSource; private final Interval interval; @JsonCreator public SegmentListUsedAction( - @JsonProperty("task") Task task, @JsonProperty("dataSource") String dataSource, @JsonProperty("interval") Interval interval ) { - this.task = task; this.dataSource = dataSource; this.interval = interval; } - @JsonProperty - public Task getTask() - { - return task; - } - @JsonProperty public String getDataSource() { @@ -52,7 +43,7 @@ public class SegmentListUsedAction implements TaskAction> } @Override - public List perform(TaskActionToolbox toolbox) + public List perform(Task task, TaskActionToolbox toolbox) { try { return toolbox.getMergerDBCoordinator().getUsedSegmentsForInterval(dataSource, interval); diff --git a/merger/src/main/java/com/metamx/druid/merger/common/actions/SegmentNukeAction.java b/merger/src/main/java/com/metamx/druid/merger/common/actions/SegmentNukeAction.java index f1b61c58d9f..2ebedec0daf 100644 --- a/merger/src/main/java/com/metamx/druid/merger/common/actions/SegmentNukeAction.java +++ b/merger/src/main/java/com/metamx/druid/merger/common/actions/SegmentNukeAction.java @@ -18,25 +18,16 @@ import java.util.Set; public class SegmentNukeAction implements TaskAction { - private final Task task; private final Set segments; @JsonCreator public SegmentNukeAction( - @JsonProperty("task") Task task, @JsonProperty("segments") Set segments ) { - this.task = task; this.segments = ImmutableSet.copyOf(segments); } - @JsonProperty - public Task getTask() - { - return task; - } - @JsonProperty public Set getSegments() { @@ -49,7 +40,7 @@ public class SegmentNukeAction implements TaskAction } @Override - public Void perform(TaskActionToolbox toolbox) + public Void perform(Task task, TaskActionToolbox toolbox) { if(!toolbox.taskLockCoversSegments(task, segments, true)) { throw new ISE("Segments not covered by locks for task: %s", task.getId()); diff --git a/merger/src/main/java/com/metamx/druid/merger/common/actions/SpawnTasksAction.java b/merger/src/main/java/com/metamx/druid/merger/common/actions/SpawnTasksAction.java index a7a73d8eac7..ec48430c49a 100644 --- a/merger/src/main/java/com/metamx/druid/merger/common/actions/SpawnTasksAction.java +++ b/merger/src/main/java/com/metamx/druid/merger/common/actions/SpawnTasksAction.java @@ -11,25 +11,16 @@ import java.util.List; public class SpawnTasksAction implements TaskAction { - private final Task task; private final List newTasks; @JsonCreator public SpawnTasksAction( - @JsonProperty("task") Task task, @JsonProperty("newTasks") List newTasks ) { - this.task = task; this.newTasks = ImmutableList.copyOf(newTasks); } - @JsonProperty - public Task getTask() - { - return task; - } - @JsonProperty public List getNewTasks() { @@ -42,7 +33,7 @@ public class SpawnTasksAction implements TaskAction } @Override - public Void perform(TaskActionToolbox toolbox) + public Void perform(Task task, TaskActionToolbox toolbox) { try { for(final Task newTask : newTasks) { diff --git a/merger/src/main/java/com/metamx/druid/merger/common/actions/TaskAction.java b/merger/src/main/java/com/metamx/druid/merger/common/actions/TaskAction.java index 7dedf50aad9..019b14a3b62 100644 --- a/merger/src/main/java/com/metamx/druid/merger/common/actions/TaskAction.java +++ b/merger/src/main/java/com/metamx/druid/merger/common/actions/TaskAction.java @@ -18,7 +18,6 @@ import com.fasterxml.jackson.core.type.TypeReference; }) public interface TaskAction { - public Task getTask(); // TODO Look into replacing this with task ID so stuff serializes smaller public TypeReference getReturnTypeReference(); // T_T - public RetType perform(TaskActionToolbox toolbox); + public RetType perform(Task task, TaskActionToolbox toolbox); } diff --git a/merger/src/main/java/com/metamx/druid/merger/common/actions/TaskActionClientFactory.java b/merger/src/main/java/com/metamx/druid/merger/common/actions/TaskActionClientFactory.java new file mode 100644 index 00000000000..2784a442f31 --- /dev/null +++ b/merger/src/main/java/com/metamx/druid/merger/common/actions/TaskActionClientFactory.java @@ -0,0 +1,29 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package com.metamx.druid.merger.common.actions; + +import com.metamx.druid.merger.common.task.Task; + +/** + */ +public interface TaskActionClientFactory +{ + public TaskActionClient create(Task task); +} diff --git a/merger/src/main/java/com/metamx/druid/merger/common/actions/TaskActionHolder.java b/merger/src/main/java/com/metamx/druid/merger/common/actions/TaskActionHolder.java new file mode 100644 index 00000000000..a440447a226 --- /dev/null +++ b/merger/src/main/java/com/metamx/druid/merger/common/actions/TaskActionHolder.java @@ -0,0 +1,54 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package com.metamx.druid.merger.common.actions; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.metamx.druid.merger.common.task.Task; + +/** + */ +public class TaskActionHolder +{ + private final Task task; + private final TaskAction action; + + @JsonCreator + public TaskActionHolder( + @JsonProperty("task") Task task, + @JsonProperty("action") TaskAction action + ) + { + this.task = task; + this.action = action; + } + + @JsonProperty + public Task getTask() + { + return task; + } + + @JsonProperty + public TaskAction getAction() + { + return action; + } +} diff --git a/merger/src/main/java/com/metamx/druid/merger/common/task/AbstractTask.java b/merger/src/main/java/com/metamx/druid/merger/common/task/AbstractTask.java index 119eab3ec87..c2f9a792bb0 100644 --- a/merger/src/main/java/com/metamx/druid/merger/common/task/AbstractTask.java +++ b/merger/src/main/java/com/metamx/druid/merger/common/task/AbstractTask.java @@ -102,9 +102,9 @@ public abstract class AbstractTask implements Task return ID_JOINER.join(objects); } - public SegmentListUsedAction makeListUsedAction() + public SegmentListUsedAction makeImplicitListUsedAction() { - return new SegmentListUsedAction(this, getDataSource(), getFixedInterval().get()); + return new SegmentListUsedAction(getDataSource(), getImplicitLockInterval().get()); } public TaskStatus success() diff --git a/merger/src/main/java/com/metamx/druid/merger/common/task/DeleteTask.java b/merger/src/main/java/com/metamx/druid/merger/common/task/DeleteTask.java index 991d76228dd..2f10861ef5f 100644 --- a/merger/src/main/java/com/metamx/druid/merger/common/task/DeleteTask.java +++ b/merger/src/main/java/com/metamx/druid/merger/common/task/DeleteTask.java @@ -77,7 +77,7 @@ public class DeleteTask extends AbstractTask public TaskStatus run(TaskToolbox toolbox) throws Exception { // Strategy: Create an empty segment covering the interval to be deleted - final TaskLock myLock = Iterables.getOnlyElement(toolbox.getTaskActionClient().submit(new LockListAction(this))); + final TaskLock myLock = Iterables.getOnlyElement(toolbox.getTaskActionClient().submit(new LockListAction())); final Interval interval = this.getImplicitLockInterval().get(); final IncrementalIndex empty = new IncrementalIndex(0, QueryGranularity.NONE, new AggregatorFactory[0]); final IndexableAdapter emptyAdapter = new IncrementalIndexAdapter(interval, empty); @@ -104,7 +104,7 @@ public class DeleteTask extends AbstractTask segment.getVersion() ); - toolbox.getTaskActionClient().submit(new SegmentInsertAction(this, ImmutableSet.of(uploadedSegment))); + toolbox.getTaskActionClient().submit(new SegmentInsertAction(ImmutableSet.of(uploadedSegment))); return TaskStatus.success(getId()); } diff --git a/merger/src/main/java/com/metamx/druid/merger/common/task/HadoopIndexTask.java b/merger/src/main/java/com/metamx/druid/merger/common/task/HadoopIndexTask.java index 29c0c517f17..6e284557529 100644 --- a/merger/src/main/java/com/metamx/druid/merger/common/task/HadoopIndexTask.java +++ b/merger/src/main/java/com/metamx/druid/merger/common/task/HadoopIndexTask.java @@ -1,3 +1,22 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + package com.metamx.druid.merger.common.task; import com.fasterxml.jackson.annotation.JsonCreator; @@ -74,7 +93,7 @@ public class HadoopIndexTask extends AbstractTask ); // We should have a lock from before we started running - final TaskLock myLock = Iterables.getOnlyElement(toolbox.getTaskActionClient().submit(new LockListAction(this))); + final TaskLock myLock = Iterables.getOnlyElement(toolbox.getTaskActionClient().submit(new LockListAction())); log.info("Setting version to: %s", myLock.getVersion()); configCopy.setVersion(myLock.getVersion()); @@ -105,7 +124,7 @@ public class HadoopIndexTask extends AbstractTask List publishedSegments = job.getPublishedSegments(); // Request segment pushes - toolbox.getTaskActionClient().submit(new SegmentInsertAction(this, ImmutableSet.copyOf(publishedSegments))); + toolbox.getTaskActionClient().submit(new SegmentInsertAction(ImmutableSet.copyOf(publishedSegments))); // Done return TaskStatus.success(getId()); diff --git a/merger/src/main/java/com/metamx/druid/merger/common/task/IndexDeterminePartitionsTask.java b/merger/src/main/java/com/metamx/druid/merger/common/task/IndexDeterminePartitionsTask.java index 2886c8cc77a..47f72b12501 100644 --- a/merger/src/main/java/com/metamx/druid/merger/common/task/IndexDeterminePartitionsTask.java +++ b/merger/src/main/java/com/metamx/druid/merger/common/task/IndexDeterminePartitionsTask.java @@ -258,7 +258,7 @@ public class IndexDeterminePartitionsTask extends AbstractTask } ); - toolbox.getTaskActionClient().submit(new SpawnTasksAction(this, nextTasks)); + toolbox.getTaskActionClient().submit(new SpawnTasksAction(nextTasks)); return TaskStatus.success(getId()); } diff --git a/merger/src/main/java/com/metamx/druid/merger/common/task/IndexGeneratorTask.java b/merger/src/main/java/com/metamx/druid/merger/common/task/IndexGeneratorTask.java index 7dc65797ea8..2a7b4683b8d 100644 --- a/merger/src/main/java/com/metamx/druid/merger/common/task/IndexGeneratorTask.java +++ b/merger/src/main/java/com/metamx/druid/merger/common/task/IndexGeneratorTask.java @@ -100,7 +100,7 @@ public class IndexGeneratorTask extends AbstractTask public TaskStatus run(final TaskToolbox toolbox) throws Exception { // We should have a lock from before we started running - final TaskLock myLock = Iterables.getOnlyElement(toolbox.getTaskActionClient().submit(new LockListAction(this))); + final TaskLock myLock = Iterables.getOnlyElement(toolbox.getTaskActionClient().submit(new LockListAction())); // We know this exists final Interval interval = getImplicitLockInterval().get(); @@ -193,7 +193,7 @@ public class IndexGeneratorTask extends AbstractTask ); // Request segment pushes - toolbox.getTaskActionClient().submit(new SegmentInsertAction(this, ImmutableSet.copyOf(pushedSegments))); + toolbox.getTaskActionClient().submit(new SegmentInsertAction(ImmutableSet.copyOf(pushedSegments))); // Done return TaskStatus.success(getId()); diff --git a/merger/src/main/java/com/metamx/druid/merger/common/task/IndexTask.java b/merger/src/main/java/com/metamx/druid/merger/common/task/IndexTask.java index 4e2f34a5171..35babcd6a22 100644 --- a/merger/src/main/java/com/metamx/druid/merger/common/task/IndexTask.java +++ b/merger/src/main/java/com/metamx/druid/merger/common/task/IndexTask.java @@ -142,7 +142,7 @@ public class IndexTask extends AbstractTask @Override public TaskStatus preflight(TaskToolbox toolbox) throws Exception { - toolbox.getTaskActionClient().submit(new SpawnTasksAction(this, toSubtasks())); + toolbox.getTaskActionClient().submit(new SpawnTasksAction(toSubtasks())); return TaskStatus.success(getId()); } diff --git a/merger/src/main/java/com/metamx/druid/merger/common/task/KillTask.java b/merger/src/main/java/com/metamx/druid/merger/common/task/KillTask.java index bf1bbbabd90..52b8b48b8b4 100644 --- a/merger/src/main/java/com/metamx/druid/merger/common/task/KillTask.java +++ b/merger/src/main/java/com/metamx/druid/merger/common/task/KillTask.java @@ -72,7 +72,7 @@ public class KillTask extends AbstractTask public TaskStatus run(TaskToolbox toolbox) throws Exception { // Confirm we have a lock (will throw if there isn't exactly one element) - final TaskLock myLock = Iterables.getOnlyElement(toolbox.getTaskActionClient().submit(new LockListAction(this))); + final TaskLock myLock = Iterables.getOnlyElement(toolbox.getTaskActionClient().submit(new LockListAction())); if(!myLock.getDataSource().equals(getDataSource())) { throw new ISE("WTF?! Lock dataSource[%s] != task dataSource[%s]", myLock.getDataSource(), getDataSource()); @@ -85,7 +85,7 @@ public class KillTask extends AbstractTask // List unused segments final List unusedSegments = toolbox .getTaskActionClient() - .submit(new SegmentListUnusedAction(this, myLock.getDataSource(), myLock.getInterval())); + .submit(new SegmentListUnusedAction(myLock.getDataSource(), myLock.getInterval())); // Verify none of these segments have versions > lock version for(final DataSegment unusedSegment : unusedSegments) { @@ -105,7 +105,7 @@ public class KillTask extends AbstractTask toolbox.getSegmentKiller().kill(unusedSegments); // Remove metadata for these segments - toolbox.getTaskActionClient().submit(new SegmentNukeAction(this, ImmutableSet.copyOf(unusedSegments))); + toolbox.getTaskActionClient().submit(new SegmentNukeAction(ImmutableSet.copyOf(unusedSegments))); return TaskStatus.success(getId()); } diff --git a/merger/src/main/java/com/metamx/druid/merger/common/task/MergeTask.java b/merger/src/main/java/com/metamx/druid/merger/common/task/MergeTask.java index 1791c2a097d..757a91c2598 100644 --- a/merger/src/main/java/com/metamx/druid/merger/common/task/MergeTask.java +++ b/merger/src/main/java/com/metamx/druid/merger/common/task/MergeTask.java @@ -119,7 +119,7 @@ public abstract class MergeTask extends AbstractTask @Override public TaskStatus run(TaskToolbox toolbox) throws Exception { - final TaskLock myLock = Iterables.getOnlyElement(toolbox.getTaskActionClient().submit(new LockListAction(this))); + final TaskLock myLock = Iterables.getOnlyElement(toolbox.getTaskActionClient().submit(new LockListAction())); final ServiceEmitter emitter = toolbox.getEmitter(); final ServiceMetricEvent.Builder builder = new ServiceMetricEvent.Builder(); final DataSegment mergedSegment = computeMergedSegment(getDataSource(), myLock.getVersion(), segments); @@ -170,7 +170,7 @@ public abstract class MergeTask extends AbstractTask emitter.emit(builder.build("merger/uploadTime", System.currentTimeMillis() - uploadStart)); emitter.emit(builder.build("merger/mergeSize", uploadedSegment.getSize())); - toolbox.getTaskActionClient().submit(new SegmentInsertAction(this, ImmutableSet.of(uploadedSegment))); + toolbox.getTaskActionClient().submit(new SegmentInsertAction(ImmutableSet.of(uploadedSegment))); return TaskStatus.success(getId()); } @@ -215,7 +215,7 @@ public abstract class MergeTask extends AbstractTask final Set current = ImmutableSet.copyOf( Iterables.transform( toolbox.getTaskActionClient() - .submit(new SegmentListUsedAction(this, getDataSource(), getImplicitLockInterval().get())), + .submit(new SegmentListUsedAction(getDataSource(), getImplicitLockInterval().get())), toIdentifier ) ); diff --git a/merger/src/main/java/com/metamx/druid/merger/common/task/Task.java b/merger/src/main/java/com/metamx/druid/merger/common/task/Task.java index b1a4598a2d2..8418ecf40a8 100644 --- a/merger/src/main/java/com/metamx/druid/merger/common/task/Task.java +++ b/merger/src/main/java/com/metamx/druid/merger/common/task/Task.java @@ -52,7 +52,7 @@ import org.joda.time.Interval; @JsonSubTypes.Type(name = "index_generator", value = IndexGeneratorTask.class), @JsonSubTypes.Type(name = "index_hadoop", value = HadoopIndexTask.class), @JsonSubTypes.Type(name = "version_converter", value = VersionConverterTask.class), - @JsonSubTypes.Type(name = "version_converter_sub", value = VersionConverterSubTask.class), + @JsonSubTypes.Type(name = "version_converter_sub", value = VersionConverterSubTask.class) }) public interface Task { diff --git a/merger/src/main/java/com/metamx/druid/merger/common/task/VersionConverterSubTask.java b/merger/src/main/java/com/metamx/druid/merger/common/task/VersionConverterSubTask.java index 4b5c3d251f0..db08e788e4d 100644 --- a/merger/src/main/java/com/metamx/druid/merger/common/task/VersionConverterSubTask.java +++ b/merger/src/main/java/com/metamx/druid/merger/common/task/VersionConverterSubTask.java @@ -71,7 +71,7 @@ public class VersionConverterSubTask extends AbstractTask final File outLocation = new File(location, "v9_out"); if (IndexIO.convertSegment(location, outLocation)) { final DataSegment updatedSegment = toolbox.getSegmentPusher().push(outLocation, segment); - toolbox.getTaskActionClient().submit(new SegmentInsertAction(this, Sets.newHashSet(updatedSegment))); + toolbox.getTaskActionClient().submit(new SegmentInsertAction(Sets.newHashSet(updatedSegment))); } return success(); diff --git a/merger/src/main/java/com/metamx/druid/merger/common/task/VersionConverterTask.java b/merger/src/main/java/com/metamx/druid/merger/common/task/VersionConverterTask.java index 062f3751767..4106e210547 100644 --- a/merger/src/main/java/com/metamx/druid/merger/common/task/VersionConverterTask.java +++ b/merger/src/main/java/com/metamx/druid/merger/common/task/VersionConverterTask.java @@ -50,11 +50,10 @@ public class VersionConverterTask extends AbstractTask { final TaskActionClient taskClient = toolbox.getTaskActionClient(); - List segments = taskClient.submit(makeListUsedAction()); + List segments = taskClient.submit(makeImplicitListUsedAction()); taskClient.submit( new SpawnTasksAction( - this, Lists.transform( segments, new Function() diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/DbTaskStorage.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/DbTaskStorage.java index 5f39efae0bb..b878885dd4a 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/DbTaskStorage.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/DbTaskStorage.java @@ -327,11 +327,11 @@ public class DbTaskStorage implements TaskStorage } @Override - public void addAuditLog(final TaskAction taskAction) + public void addAuditLog(final Task task, final TaskAction taskAction) { Preconditions.checkNotNull(taskAction, "taskAction"); - log.info("Logging action for task[%s]: %s", taskAction.getTask().getId(), taskAction); + log.info("Logging action for task[%s]: %s", task.getId(), taskAction); dbi.withHandle( new HandleCallback() @@ -345,7 +345,7 @@ public class DbTaskStorage implements TaskStorage dbConnectorConfig.getTaskLogTable() ) ) - .bind("task_id", taskAction.getTask().getId()) + .bind("task_id", task.getId()) .bind("log_payload", jsonMapper.writeValueAsString(taskAction)) .execute(); } diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/LocalTaskStorage.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/HeapMemoryTaskStorage.java similarity index 95% rename from merger/src/main/java/com/metamx/druid/merger/coordinator/LocalTaskStorage.java rename to merger/src/main/java/com/metamx/druid/merger/coordinator/HeapMemoryTaskStorage.java index 042f0b8196f..895804bc7fd 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/LocalTaskStorage.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/HeapMemoryTaskStorage.java @@ -40,14 +40,14 @@ import java.util.concurrent.locks.ReentrantLock; * Implements an in-heap TaskStorage facility, with no persistence across restarts. This class is not * thread safe. */ -public class LocalTaskStorage implements TaskStorage +public class HeapMemoryTaskStorage implements TaskStorage { private final ReentrantLock giant = new ReentrantLock(); private final Map tasks = Maps.newHashMap(); private final Multimap taskLocks = HashMultimap.create(); private final Multimap taskActions = ArrayListMultimap.create(); - private static final Logger log = new Logger(LocalTaskStorage.class); + private static final Logger log = new Logger(HeapMemoryTaskStorage.class); @Override public void insert(Task task, TaskStatus status) @@ -185,12 +185,12 @@ public class LocalTaskStorage implements TaskStorage } @Override - public void addAuditLog(TaskAction taskAction) + public void addAuditLog(Task task, TaskAction taskAction) { giant.lock(); try { - taskActions.put(taskAction.getTask().getId(), taskAction); + taskActions.put(task.getId(), taskAction); } finally { giant.unlock(); } diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/LocalTaskRunner.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/LocalTaskRunner.java index 573152fce09..a4f58a361f8 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/LocalTaskRunner.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/LocalTaskRunner.java @@ -26,6 +26,7 @@ import com.metamx.common.logger.Logger; import com.metamx.druid.merger.common.TaskCallback; import com.metamx.druid.merger.common.TaskStatus; import com.metamx.druid.merger.common.TaskToolbox; +import com.metamx.druid.merger.common.TaskToolboxFactory; import com.metamx.druid.merger.common.task.Task; import org.apache.commons.io.FileUtils; @@ -38,17 +39,17 @@ import java.util.concurrent.ExecutorService; */ public class LocalTaskRunner implements TaskRunner { - private final TaskToolbox toolbox; + private final TaskToolboxFactory toolboxFactory; private final ExecutorService exec; private static final Logger log = new Logger(LocalTaskRunner.class); public LocalTaskRunner( - TaskToolbox toolbox, + TaskToolboxFactory toolboxFactory, ExecutorService exec ) { - this.toolbox = toolbox; + this.toolboxFactory = toolboxFactory; this.exec = exec; } @@ -61,6 +62,8 @@ public class LocalTaskRunner implements TaskRunner @Override public void run(final Task task, final TaskCallback callback) { + final TaskToolbox toolbox = toolboxFactory.build(task); + exec.submit( new Runnable() { diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/TaskLockbox.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/TaskLockbox.java index 07abd0594ac..0a4bd925d4d 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/TaskLockbox.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/TaskLockbox.java @@ -1,3 +1,22 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + package com.metamx.druid.merger.coordinator; import com.google.common.base.Function; diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/TaskMasterLifecycle.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/TaskMasterLifecycle.java index 0a38e5efa37..da73ede5923 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/TaskMasterLifecycle.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/TaskMasterLifecycle.java @@ -26,6 +26,8 @@ import com.metamx.common.lifecycle.LifecycleStop; import com.metamx.druid.initialization.Initialization; import com.metamx.druid.initialization.ServiceDiscoveryConfig; import com.metamx.druid.merger.common.TaskToolbox; +import com.metamx.druid.merger.common.TaskToolboxFactory; +import com.metamx.druid.merger.common.task.Task; import com.metamx.druid.merger.coordinator.config.IndexerCoordinatorConfig; import com.metamx.druid.merger.coordinator.exec.TaskConsumer; import com.metamx.druid.merger.coordinator.scaling.ResourceManagementScheduler; @@ -49,7 +51,7 @@ public class TaskMasterLifecycle private final ReentrantLock giant = new ReentrantLock(); private final Condition mayBeStopped = giant.newCondition(); private final TaskQueue taskQueue; - private final TaskToolbox taskToolbox; + private final TaskToolboxFactory taskToolboxFactory; private volatile boolean leading = false; private volatile TaskRunner taskRunner; @@ -59,7 +61,7 @@ public class TaskMasterLifecycle public TaskMasterLifecycle( final TaskQueue taskQueue, - final TaskToolbox taskToolbox, + final TaskToolboxFactory taskToolboxFactory, final IndexerCoordinatorConfig indexerCoordinatorConfig, final ServiceDiscoveryConfig serviceDiscoveryConfig, final TaskRunnerFactory runnerFactory, @@ -69,7 +71,7 @@ public class TaskMasterLifecycle ) { this.taskQueue = taskQueue; - this.taskToolbox = taskToolbox; + this.taskToolboxFactory = taskToolboxFactory; this.leaderSelector = new LeaderSelector( curator, indexerCoordinatorConfig.getLeaderLatchPath(), new LeaderSelectorListener() @@ -87,7 +89,7 @@ public class TaskMasterLifecycle final TaskConsumer taskConsumer = new TaskConsumer( taskQueue, taskRunner, - taskToolbox, + taskToolboxFactory, emitter ); @@ -217,9 +219,9 @@ public class TaskMasterLifecycle return taskQueue; } - public TaskToolbox getTaskToolbox() + public TaskToolbox getTaskToolbox(Task task) { - return taskToolbox; + return taskToolboxFactory.build(task); } public ResourceManagementScheduler getResourceManagementScheduler() diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/TaskStorage.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/TaskStorage.java index 2e21f52876b..d6bfbfd889e 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/TaskStorage.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/TaskStorage.java @@ -69,7 +69,7 @@ public interface TaskStorage /** * Add an action taken by a task to the audit log. */ - public void addAuditLog(TaskAction taskAction); + public void addAuditLog(Task task, TaskAction taskAction); /** * Returns all actions taken by a task. diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/exec/TaskConsumer.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/exec/TaskConsumer.java index f1e31a9e5c1..48d3ecdc471 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/exec/TaskConsumer.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/exec/TaskConsumer.java @@ -24,7 +24,7 @@ import com.metamx.common.lifecycle.LifecycleStart; import com.metamx.common.lifecycle.LifecycleStop; import com.metamx.druid.merger.common.TaskCallback; import com.metamx.druid.merger.common.TaskStatus; -import com.metamx.druid.merger.common.TaskToolbox; +import com.metamx.druid.merger.common.TaskToolboxFactory; import com.metamx.druid.merger.common.task.Task; import com.metamx.druid.merger.coordinator.TaskQueue; import com.metamx.druid.merger.coordinator.TaskRunner; @@ -36,7 +36,7 @@ public class TaskConsumer implements Runnable { private final TaskQueue queue; private final TaskRunner runner; - private final TaskToolbox toolbox; + private final TaskToolboxFactory toolboxFactory; private final ServiceEmitter emitter; private final Thread thready; @@ -47,13 +47,13 @@ public class TaskConsumer implements Runnable public TaskConsumer( TaskQueue queue, TaskRunner runner, - TaskToolbox toolbox, + TaskToolboxFactory toolboxFactory, ServiceEmitter emitter ) { this.queue = queue; this.runner = runner; - this.toolbox = toolbox; + this.toolboxFactory = toolboxFactory; this.emitter = emitter; this.thready = new Thread(this); } @@ -123,7 +123,7 @@ public class TaskConsumer implements Runnable // Run preflight checks TaskStatus preflightStatus; try { - preflightStatus = task.preflight(toolbox); + preflightStatus = task.preflight(toolboxFactory.build(task)); log.info("Preflight done for task: %s", task.getId()); } catch (Exception e) { diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorNode.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorNode.java index 5da9c557936..833c3976ee8 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorNode.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorNode.java @@ -48,22 +48,21 @@ import com.metamx.druid.http.RedirectInfo; import com.metamx.druid.http.StatusServlet; import com.metamx.druid.initialization.Initialization; import com.metamx.druid.initialization.ServerConfig; +import com.metamx.druid.initialization.ServerInit; import com.metamx.druid.initialization.ServiceDiscoveryConfig; import com.metamx.druid.jackson.DefaultObjectMapper; import com.metamx.druid.loading.DataSegmentPusher; -import com.metamx.druid.loading.S3DataSegmentPusher; -import com.metamx.druid.loading.S3DataSegmentPusherConfig; import com.metamx.druid.loading.S3SegmentKiller; import com.metamx.druid.loading.SegmentKiller; -import com.metamx.druid.merger.common.TaskToolbox; -import com.metamx.druid.merger.common.actions.LocalTaskActionClient; +import com.metamx.druid.merger.common.TaskToolboxFactory; +import com.metamx.druid.merger.common.actions.LocalTaskActionClientFactory; import com.metamx.druid.merger.common.actions.TaskActionToolbox; import com.metamx.druid.merger.common.config.IndexerZkConfig; import com.metamx.druid.merger.common.config.TaskConfig; import com.metamx.druid.merger.common.index.StaticS3FirehoseFactory; import com.metamx.druid.merger.coordinator.DbTaskStorage; +import com.metamx.druid.merger.coordinator.HeapMemoryTaskStorage; import com.metamx.druid.merger.coordinator.LocalTaskRunner; -import com.metamx.druid.merger.coordinator.LocalTaskStorage; import com.metamx.druid.merger.coordinator.MergerDBCoordinator; import com.metamx.druid.merger.coordinator.RemoteTaskRunner; import com.metamx.druid.merger.coordinator.RetryPolicyFactory; @@ -147,7 +146,8 @@ public class IndexerCoordinatorNode extends RegisteringNode private RestS3Service s3Service = null; private IndexerCoordinatorConfig config = null; private TaskConfig taskConfig = null; - private TaskToolbox taskToolbox = null; + private DataSegmentPusher segmentPusher = null; + private TaskToolboxFactory taskToolboxFactory = null; private MergerDBCoordinator mergerDBCoordinator = null; private TaskStorage taskStorage = null; private TaskQueue taskQueue = null; @@ -208,6 +208,12 @@ public class IndexerCoordinatorNode extends RegisteringNode return this; } + public IndexerCoordinatorNode setSegmentPusher(DataSegmentPusher segmentPusher) + { + this.segmentPusher = segmentPusher; + return this; + } + public IndexerCoordinatorNode setMergeDbCoordinator(MergerDBCoordinator mergeDbCoordinator) { this.mergerDBCoordinator = mergeDbCoordinator; @@ -252,6 +258,7 @@ public class IndexerCoordinatorNode extends RegisteringNode initializeTaskStorage(); initializeTaskLockbox(); initializeTaskQueue(); + initializeDataSegmentPusher(); initializeTaskToolbox(); initializeJacksonInjections(); initializeJacksonSubtypes(); @@ -339,7 +346,7 @@ public class IndexerCoordinatorNode extends RegisteringNode final ServiceDiscoveryConfig serviceDiscoveryConfig = configFactory.build(ServiceDiscoveryConfig.class); taskMasterLifecycle = new TaskMasterLifecycle( taskQueue, - taskToolbox, + taskToolboxFactory, config, serviceDiscoveryConfig, taskRunnerFactory, @@ -403,7 +410,7 @@ public class IndexerCoordinatorNode extends RegisteringNode InjectableValues.Std injectables = new InjectableValues.Std(); injectables.addValue("s3Client", s3Service) - .addValue("segmentPusher", taskToolbox.getSegmentPusher()); + .addValue("segmentPusher", segmentPusher); jsonMapper.setInjectableValues(injectables); } @@ -472,26 +479,26 @@ public class IndexerCoordinatorNode extends RegisteringNode ); } + public void initializeDataSegmentPusher() + { + if (segmentPusher == null) { + segmentPusher = ServerInit.getSegmentPusher(props, configFactory, jsonMapper); + } + } + public void initializeTaskToolbox() { - if (taskToolbox == null) { - final DataSegmentPusher dataSegmentPusher = new S3DataSegmentPusher( - s3Service, - configFactory.build(S3DataSegmentPusherConfig.class), - jsonMapper - ); - final SegmentKiller segmentKiller = new S3SegmentKiller( - s3Service - ); - taskToolbox = new TaskToolbox( + if (taskToolboxFactory == null) { + final SegmentKiller segmentKiller = new S3SegmentKiller(s3Service); + taskToolboxFactory = new TaskToolboxFactory( taskConfig, - new LocalTaskActionClient( + new LocalTaskActionClientFactory( taskStorage, new TaskActionToolbox(taskQueue, taskLockbox, mergerDBCoordinator, emitter) ), emitter, s3Service, - dataSegmentPusher, + segmentPusher, segmentKiller, jsonMapper ); @@ -546,7 +553,7 @@ public class IndexerCoordinatorNode extends RegisteringNode { if (taskStorage == null) { if (config.getStorageImpl().equals("local")) { - taskStorage = new LocalTaskStorage(); + taskStorage = new HeapMemoryTaskStorage(); } else if (config.getStorageImpl().equals("db")) { final IndexerDbConnectorConfig dbConnectorConfig = configFactory.build(IndexerDbConnectorConfig.class); taskStorage = new DbTaskStorage(jsonMapper, dbConnectorConfig, new DbConnector(dbConnectorConfig).getDBI()); @@ -615,7 +622,7 @@ public class IndexerCoordinatorNode extends RegisteringNode public TaskRunner build() { final ExecutorService runnerExec = Executors.newFixedThreadPool(config.getNumLocalThreads()); - return new LocalTaskRunner(taskToolbox, runnerExec); + return new LocalTaskRunner(taskToolboxFactory, runnerExec); } }; } else { diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorResource.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorResource.java index bde7bd6a2fd..3ac16741028 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorResource.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorResource.java @@ -28,6 +28,7 @@ import com.metamx.common.logger.Logger; import com.metamx.druid.client.DataSegment; import com.metamx.druid.merger.common.TaskStatus; import com.metamx.druid.merger.common.actions.TaskAction; +import com.metamx.druid.merger.common.actions.TaskActionHolder; import com.metamx.druid.merger.common.task.Task; import com.metamx.druid.merger.coordinator.TaskMasterLifecycle; import com.metamx.druid.merger.coordinator.TaskStorageQueryAdapter; @@ -181,9 +182,12 @@ public class IndexerCoordinatorResource @POST @Path("/action") @Produces("application/json") - public Response doAction(final TaskAction action) + public Response doAction(final TaskActionHolder holder) { - final T ret = taskMasterLifecycle.getTaskToolbox().getTaskActionClient().submit(action); + final T ret = taskMasterLifecycle.getTaskToolbox(holder.getTask()) + .getTaskActionClient() + .submit(holder.getAction()); + final Map retMap = Maps.newHashMap(); retMap.put("result", ret); diff --git a/merger/src/main/java/com/metamx/druid/merger/worker/TaskMonitor.java b/merger/src/main/java/com/metamx/druid/merger/worker/TaskMonitor.java index d7836d46faa..ec3a8d992e0 100644 --- a/merger/src/main/java/com/metamx/druid/merger/worker/TaskMonitor.java +++ b/merger/src/main/java/com/metamx/druid/merger/worker/TaskMonitor.java @@ -23,6 +23,7 @@ import com.metamx.common.lifecycle.LifecycleStart; import com.metamx.common.lifecycle.LifecycleStop; import com.metamx.druid.merger.common.TaskStatus; import com.metamx.druid.merger.common.TaskToolbox; +import com.metamx.druid.merger.common.TaskToolboxFactory; import com.metamx.druid.merger.common.task.Task; import com.metamx.emitter.EmittingLogger; import com.netflix.curator.framework.CuratorFramework; @@ -45,21 +46,21 @@ public class TaskMonitor private final PathChildrenCache pathChildrenCache; private final CuratorFramework cf; private final WorkerCuratorCoordinator workerCuratorCoordinator; - private final TaskToolbox toolbox; + private final TaskToolboxFactory toolboxFactory; private final ExecutorService exec; public TaskMonitor( PathChildrenCache pathChildrenCache, CuratorFramework cf, WorkerCuratorCoordinator workerCuratorCoordinator, - TaskToolbox toolbox, + TaskToolboxFactory toolboxFactory, ExecutorService exec ) { this.pathChildrenCache = pathChildrenCache; this.cf = cf; this.workerCuratorCoordinator = workerCuratorCoordinator; - this.toolbox = toolbox; + this.toolboxFactory = toolboxFactory; this.exec = exec; } @@ -81,10 +82,11 @@ public class TaskMonitor throws Exception { if (pathChildrenCacheEvent.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED)) { - final Task task = toolbox.getObjectMapper().readValue( + final Task task = toolboxFactory.getObjectMapper().readValue( cf.getData().forPath(pathChildrenCacheEvent.getData().getPath()), Task.class ); + final TaskToolbox toolbox = toolboxFactory.build(task); if (workerCuratorCoordinator.statusExists(task.getId())) { log.warn("Got task %s that I am already running...", task.getId()); diff --git a/merger/src/main/java/com/metamx/druid/merger/worker/http/WorkerNode.java b/merger/src/main/java/com/metamx/druid/merger/worker/http/WorkerNode.java index d02ffa5d9e3..bad04040e73 100644 --- a/merger/src/main/java/com/metamx/druid/merger/worker/http/WorkerNode.java +++ b/merger/src/main/java/com/metamx/druid/merger/worker/http/WorkerNode.java @@ -35,6 +35,7 @@ import com.metamx.druid.http.StatusServlet; import com.metamx.druid.initialization.CuratorConfig; import com.metamx.druid.initialization.Initialization; import com.metamx.druid.initialization.ServerConfig; +import com.metamx.druid.initialization.ServerInit; import com.metamx.druid.initialization.ServiceDiscoveryConfig; import com.metamx.druid.jackson.DefaultObjectMapper; import com.metamx.druid.loading.DataSegmentPusher; @@ -43,7 +44,9 @@ import com.metamx.druid.loading.S3DataSegmentPusherConfig; import com.metamx.druid.loading.S3SegmentKiller; import com.metamx.druid.loading.SegmentKiller; import com.metamx.druid.merger.common.TaskToolbox; +import com.metamx.druid.merger.common.TaskToolboxFactory; import com.metamx.druid.merger.common.actions.RemoteTaskActionClient; +import com.metamx.druid.merger.common.actions.RemoteTaskActionClientFactory; import com.metamx.druid.merger.common.config.IndexerZkConfig; import com.metamx.druid.merger.common.config.TaskConfig; import com.metamx.druid.merger.common.index.StaticS3FirehoseFactory; @@ -106,7 +109,8 @@ public class WorkerNode extends RegisteringNode private ServiceEmitter emitter = null; private TaskConfig taskConfig = null; private WorkerConfig workerConfig = null; - private TaskToolbox taskToolbox = null; + private DataSegmentPusher segmentPusher = null; + private TaskToolboxFactory taskToolboxFactory = null; private CuratorFramework curatorFramework = null; private ServiceDiscovery serviceDiscovery = null; private ServiceProvider coordinatorServiceProvider = null; @@ -149,9 +153,15 @@ public class WorkerNode extends RegisteringNode return this; } - public WorkerNode setTaskToolbox(TaskToolbox taskToolbox) + public WorkerNode setSegmentPusher(DataSegmentPusher segmentPusher) { - this.taskToolbox = taskToolbox; + this.segmentPusher = segmentPusher; + return this; + } + + public WorkerNode setTaskToolboxFactory(TaskToolboxFactory taskToolboxFactory) + { + this.taskToolboxFactory = taskToolboxFactory; return this; } @@ -195,6 +205,7 @@ public class WorkerNode extends RegisteringNode initializeCuratorFramework(); initializeServiceDiscovery(); initializeCoordinatorServiceProvider(); + initializeDataSegmentPusher(); initializeTaskToolbox(); initializeJacksonInjections(); initializeJacksonSubtypes(); @@ -271,7 +282,7 @@ public class WorkerNode extends RegisteringNode InjectableValues.Std injectables = new InjectableValues.Std(); injectables.addValue("s3Client", s3Service) - .addValue("segmentPusher", taskToolbox.getSegmentPusher()); + .addValue("segmentPusher", segmentPusher); jsonMapper.setInjectableValues(injectables); } @@ -334,23 +345,23 @@ public class WorkerNode extends RegisteringNode } } + public void initializeDataSegmentPusher() + { + if (segmentPusher == null) { + segmentPusher = ServerInit.getSegmentPusher(props, configFactory, jsonMapper); + } + } + public void initializeTaskToolbox() throws S3ServiceException { - if (taskToolbox == null) { - final DataSegmentPusher dataSegmentPusher = new S3DataSegmentPusher( - s3Service, - configFactory.build(S3DataSegmentPusherConfig.class), - jsonMapper - ); - final SegmentKiller segmentKiller = new S3SegmentKiller( - s3Service - ); - taskToolbox = new TaskToolbox( + if (taskToolboxFactory == null) { + final SegmentKiller segmentKiller = new S3SegmentKiller(s3Service); + taskToolboxFactory = new TaskToolboxFactory( taskConfig, - new RemoteTaskActionClient(httpClient, coordinatorServiceProvider, jsonMapper), + new RemoteTaskActionClientFactory(httpClient, coordinatorServiceProvider, jsonMapper), emitter, s3Service, - dataSegmentPusher, + segmentPusher, segmentKiller, jsonMapper ); @@ -417,7 +428,7 @@ public class WorkerNode extends RegisteringNode pathChildrenCache, curatorFramework, workerCuratorCoordinator, - taskToolbox, + taskToolboxFactory, workerExec ); lifecycle.addManagedInstance(taskMonitor); diff --git a/merger/src/test/java/com/metamx/druid/merger/coordinator/RemoteTaskRunnerTest.java b/merger/src/test/java/com/metamx/druid/merger/coordinator/RemoteTaskRunnerTest.java index 9242e19f355..18ce14a4555 100644 --- a/merger/src/test/java/com/metamx/druid/merger/coordinator/RemoteTaskRunnerTest.java +++ b/merger/src/test/java/com/metamx/druid/merger/coordinator/RemoteTaskRunnerTest.java @@ -12,6 +12,7 @@ import com.metamx.druid.merger.TestTask; import com.metamx.druid.merger.common.TaskCallback; import com.metamx.druid.merger.common.TaskStatus; import com.metamx.druid.merger.common.TaskToolbox; +import com.metamx.druid.merger.common.TaskToolboxFactory; import com.metamx.druid.merger.common.config.IndexerZkConfig; import com.metamx.druid.merger.common.config.TaskConfig; import com.metamx.druid.merger.coordinator.config.RemoteTaskRunnerConfig; @@ -280,7 +281,7 @@ public class RemoteTaskRunnerTest new PathChildrenCache(cf, String.format("%s/worker1", tasksPath), true), cf, workerCuratorCoordinator, - new TaskToolbox( + new TaskToolboxFactory( new TaskConfig() { @Override diff --git a/merger/src/test/java/com/metamx/druid/merger/coordinator/TaskLifecycleTest.java b/merger/src/test/java/com/metamx/druid/merger/coordinator/TaskLifecycleTest.java index ae5b46fdfc1..3dc889b96f0 100644 --- a/merger/src/test/java/com/metamx/druid/merger/coordinator/TaskLifecycleTest.java +++ b/merger/src/test/java/com/metamx/druid/merger/coordinator/TaskLifecycleTest.java @@ -1,3 +1,22 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + package com.metamx.druid.merger.coordinator; import com.google.common.base.Optional; @@ -22,11 +41,11 @@ import com.metamx.druid.input.MapBasedInputRow; import com.metamx.druid.jackson.DefaultObjectMapper; import com.metamx.druid.loading.DataSegmentPusher; import com.metamx.druid.loading.SegmentKiller; -import com.metamx.druid.loading.SegmentLoadingException; import com.metamx.druid.merger.common.TaskLock; import com.metamx.druid.merger.common.TaskStatus; import com.metamx.druid.merger.common.TaskToolbox; -import com.metamx.druid.merger.common.actions.LocalTaskActionClient; +import com.metamx.druid.merger.common.TaskToolboxFactory; +import com.metamx.druid.merger.common.actions.LocalTaskActionClientFactory; import com.metamx.druid.merger.common.actions.LockAcquireAction; import com.metamx.druid.merger.common.actions.LockListAction; import com.metamx.druid.merger.common.actions.LockReleaseAction; @@ -59,7 +78,6 @@ import java.io.IOException; import java.util.Collection; import java.util.Iterator; import java.util.List; -import java.util.Map; import java.util.Set; import java.util.concurrent.Executors; @@ -71,7 +89,7 @@ public class TaskLifecycleTest private TaskQueue tq = null; private TaskRunner tr = null; private MockMergerDBCoordinator mdc = null; - private TaskToolbox tb = null; + private TaskToolboxFactory tb = null; private TaskConsumer tc = null; TaskStorageQueryAdapter tsqa = null; @@ -91,12 +109,12 @@ public class TaskLifecycleTest tmp = Files.createTempDir(); - ts = new LocalTaskStorage(); + ts = new HeapMemoryTaskStorage(); tl = new TaskLockbox(ts); tq = new TaskQueue(ts, tl); mdc = newMockMDC(); - tb = new TaskToolbox( + tb = new TaskToolboxFactory( new TaskConfig() { @Override @@ -117,7 +135,7 @@ public class TaskLifecycleTest return null; } }, - new LocalTaskActionClient(ts, new TaskActionToolbox(tq, tl, mdc, newMockEmitter())), + new LocalTaskActionClientFactory(ts, new TaskActionToolbox(tq, tl, mdc, newMockEmitter())), newMockEmitter(), null, // s3 client new DataSegmentPusher() @@ -137,16 +155,7 @@ public class TaskLifecycleTest } }, new DefaultObjectMapper() - ) - { - @Override - public Map getSegments( - Task task, List segments - ) throws SegmentLoadingException - { - return ImmutableMap.of(); - } - }; + ); tr = new LocalTaskRunner( tb, @@ -239,11 +248,12 @@ public class TaskLifecycleTest @Test public void testKillTask() throws Exception { - // TODO: Worst test ever + // This test doesn't actually do anything right now. We should actually put things into the Mocked coordinator + // Such that this test can test things... final Task killTask = new KillTask("foo", new Interval("2010-01-02/P2D")); - final TaskStatus mergedStatus = runTask(killTask); - Assert.assertEquals("merged statusCode", TaskStatus.Status.SUCCESS, mergedStatus.getStatusCode()); + final TaskStatus status = runTask(killTask); + Assert.assertEquals("merged statusCode", TaskStatus.Status.SUCCESS, status.getStatusCode()); Assert.assertEquals("num segments published", 0, mdc.getPublished().size()); Assert.assertEquals("num segments nuked", 0, mdc.getNuked().size()); } @@ -273,8 +283,8 @@ public class TaskLifecycleTest // Sort of similar to what realtime tasks do: // Acquire lock for first interval - final Optional lock1 = toolbox.getTaskActionClient().submit(new LockAcquireAction(this, interval1)); - final List locks1 = toolbox.getTaskActionClient().submit(new LockListAction(this)); + final Optional lock1 = toolbox.getTaskActionClient().submit(new LockAcquireAction(interval1)); + final List locks1 = toolbox.getTaskActionClient().submit(new LockListAction()); // (Confirm lock sanity) Assert.assertTrue("lock1 present", lock1.isPresent()); @@ -282,8 +292,8 @@ public class TaskLifecycleTest Assert.assertEquals("locks1", ImmutableList.of(lock1.get()), locks1); // Acquire lock for second interval - final Optional lock2 = toolbox.getTaskActionClient().submit(new LockAcquireAction(this, interval2)); - final List locks2 = toolbox.getTaskActionClient().submit(new LockListAction(this)); + final Optional lock2 = toolbox.getTaskActionClient().submit(new LockAcquireAction(interval2)); + final List locks2 = toolbox.getTaskActionClient().submit(new LockListAction()); // (Confirm lock sanity) Assert.assertTrue("lock2 present", lock2.isPresent()); @@ -294,7 +304,6 @@ public class TaskLifecycleTest toolbox.getTaskActionClient() .submit( new SegmentInsertAction( - this, ImmutableSet.of( DataSegment.builder() .dataSource("foo") @@ -306,8 +315,8 @@ public class TaskLifecycleTest ); // Release first lock - toolbox.getTaskActionClient().submit(new LockReleaseAction(this, interval1)); - final List locks3 = toolbox.getTaskActionClient().submit(new LockListAction(this)); + toolbox.getTaskActionClient().submit(new LockReleaseAction(interval1)); + final List locks3 = toolbox.getTaskActionClient().submit(new LockListAction()); // (Confirm lock sanity) Assert.assertEquals("locks3", ImmutableList.of(lock2.get()), locks3); @@ -316,7 +325,6 @@ public class TaskLifecycleTest toolbox.getTaskActionClient() .submit( new SegmentInsertAction( - this, ImmutableSet.of( DataSegment.builder() .dataSource("foo") @@ -328,8 +336,8 @@ public class TaskLifecycleTest ); // Release second lock - toolbox.getTaskActionClient().submit(new LockReleaseAction(this, interval2)); - final List locks4 = toolbox.getTaskActionClient().submit(new LockListAction(this)); + toolbox.getTaskActionClient().submit(new LockReleaseAction(interval2)); + final List locks4 = toolbox.getTaskActionClient().submit(new LockListAction()); // (Confirm lock sanity) Assert.assertEquals("locks4", ImmutableList.of(), locks4); @@ -363,7 +371,7 @@ public class TaskLifecycleTest { final TaskLock myLock = Iterables.getOnlyElement( toolbox.getTaskActionClient() - .submit(new LockListAction(this)) + .submit(new LockListAction()) ); final DataSegment segment = DataSegment.builder() @@ -372,7 +380,7 @@ public class TaskLifecycleTest .version(myLock.getVersion()) .build(); - toolbox.getTaskActionClient().submit(new SegmentInsertAction(this, ImmutableSet.of(segment))); + toolbox.getTaskActionClient().submit(new SegmentInsertAction(ImmutableSet.of(segment))); return TaskStatus.success(getId()); } }; @@ -398,10 +406,7 @@ public class TaskLifecycleTest @Override public TaskStatus run(TaskToolbox toolbox) throws Exception { - final TaskLock myLock = Iterables.getOnlyElement( - toolbox.getTaskActionClient() - .submit(new LockListAction(this)) - ); + final TaskLock myLock = Iterables.getOnlyElement(toolbox.getTaskActionClient().submit(new LockListAction())); final DataSegment segment = DataSegment.builder() .dataSource("ds") @@ -409,7 +414,7 @@ public class TaskLifecycleTest .version(myLock.getVersion()) .build(); - toolbox.getTaskActionClient().submit(new SegmentInsertAction(this, ImmutableSet.of(segment))); + toolbox.getTaskActionClient().submit(new SegmentInsertAction(ImmutableSet.of(segment))); return TaskStatus.success(getId()); } }; @@ -435,10 +440,7 @@ public class TaskLifecycleTest @Override public TaskStatus run(TaskToolbox toolbox) throws Exception { - final TaskLock myLock = Iterables.getOnlyElement( - toolbox.getTaskActionClient() - .submit(new LockListAction(this)) - ); + final TaskLock myLock = Iterables.getOnlyElement(toolbox.getTaskActionClient().submit(new LockListAction())); final DataSegment segment = DataSegment.builder() .dataSource("ds") @@ -446,7 +448,7 @@ public class TaskLifecycleTest .version(myLock.getVersion() + "1!!!1!!") .build(); - toolbox.getTaskActionClient().submit(new SegmentInsertAction(this, ImmutableSet.of(segment))); + toolbox.getTaskActionClient().submit(new SegmentInsertAction(ImmutableSet.of(segment))); return TaskStatus.success(getId()); } }; diff --git a/merger/src/test/java/com/metamx/druid/merger/coordinator/TaskQueueTest.java b/merger/src/test/java/com/metamx/druid/merger/coordinator/TaskQueueTest.java index 0d003551ea9..939dc9b6b21 100644 --- a/merger/src/test/java/com/metamx/druid/merger/coordinator/TaskQueueTest.java +++ b/merger/src/test/java/com/metamx/druid/merger/coordinator/TaskQueueTest.java @@ -26,7 +26,8 @@ import com.google.common.collect.Sets; import com.metamx.druid.merger.common.TaskLock; import com.metamx.druid.merger.common.TaskStatus; import com.metamx.druid.merger.common.TaskToolbox; -import com.metamx.druid.merger.common.actions.LocalTaskActionClient; +import com.metamx.druid.merger.common.TaskToolboxFactory; +import com.metamx.druid.merger.common.actions.LocalTaskActionClientFactory; import com.metamx.druid.merger.common.actions.SpawnTasksAction; import com.metamx.druid.merger.common.actions.TaskActionToolbox; import com.metamx.druid.merger.common.task.AbstractTask; @@ -43,7 +44,7 @@ public class TaskQueueTest @Test public void testEmptyQueue() throws Exception { - final TaskStorage ts = new LocalTaskStorage(); + final TaskStorage ts = new HeapMemoryTaskStorage(); final TaskLockbox tl = new TaskLockbox(ts); final TaskQueue tq = newTaskQueue(ts, tl); @@ -65,7 +66,7 @@ public class TaskQueueTest @Test public void testAddRemove() throws Exception { - final TaskStorage ts = new LocalTaskStorage(); + final TaskStorage ts = new HeapMemoryTaskStorage(); final TaskLockbox tl = new TaskLockbox(ts); final TaskQueue tq = newTaskQueue(ts, tl); @@ -154,12 +155,12 @@ public class TaskQueueTest @Test public void testContinues() throws Exception { - final TaskStorage ts = new LocalTaskStorage(); + final TaskStorage ts = new HeapMemoryTaskStorage(); final TaskLockbox tl = new TaskLockbox(ts); final TaskQueue tq = newTaskQueue(ts, tl); - final TaskToolbox tb = new TaskToolbox( + final TaskToolboxFactory tb = new TaskToolboxFactory( null, - new LocalTaskActionClient(ts, new TaskActionToolbox(tq, tl, null, null)), + new LocalTaskActionClientFactory(ts, new TaskActionToolbox(tq, tl, null, null)), null, null, null, @@ -181,7 +182,7 @@ public class TaskQueueTest Assert.assertNull("poll #2", tq.poll()); // report T1 done. Should cause T0 to be created - tq.notify(t1, t1.run(tb)); + tq.notify(t1, t1.run(tb.build(t1))); Assert.assertTrue("T0 isPresent (#2)", ts.getStatus("T0").isPresent()); Assert.assertTrue("T0 isRunnable (#2)", ts.getStatus("T0").get().isRunnable()); @@ -195,7 +196,7 @@ public class TaskQueueTest Assert.assertNull("poll #4", tq.poll()); // report T0 done. Should cause T0, T1 to be marked complete - tq.notify(t0, t0.run(tb)); + tq.notify(t0, t0.run(tb.build(t0))); Assert.assertTrue("T0 isPresent (#3)", ts.getStatus("T0").isPresent()); Assert.assertTrue("T0 isRunnable (#3)", !ts.getStatus("T0").get().isRunnable()); @@ -211,12 +212,12 @@ public class TaskQueueTest @Test public void testConcurrency() throws Exception { - final TaskStorage ts = new LocalTaskStorage(); + final TaskStorage ts = new HeapMemoryTaskStorage(); final TaskLockbox tl = new TaskLockbox(ts); final TaskQueue tq = newTaskQueue(ts, tl); - final TaskToolbox tb = new TaskToolbox( + final TaskToolboxFactory tb = new TaskToolboxFactory( null, - new LocalTaskActionClient(ts, new TaskActionToolbox(tq, tl, null, null)), + new LocalTaskActionClientFactory(ts, new TaskActionToolbox(tq, tl, null, null)), null, null, null, @@ -248,7 +249,7 @@ public class TaskQueueTest Thread.sleep(5); // Finish t0 - tq.notify(t0, t0.run(tb)); + tq.notify(t0, t0.run(tb.build(t0))); // take max number of tasks final Set taken = Sets.newHashSet(); @@ -280,7 +281,7 @@ public class TaskQueueTest Assert.assertNull("null poll #2", tq.poll()); // Finish t3 - tq.notify(t3, t3.run(tb)); + tq.notify(t3, t3.run(tb.build(t3))); // We should be able to get t2 now final Task wt2 = tq.poll(); @@ -291,7 +292,7 @@ public class TaskQueueTest Assert.assertNull("null poll #3", tq.poll()); // Finish t2 - tq.notify(t2, t2.run(tb)); + tq.notify(t2, t2.run(tb.build(t2))); // We should be able to get t4 // And it should be in group G0, but that group should have a different version than last time @@ -305,14 +306,14 @@ public class TaskQueueTest Assert.assertNotSame("wt4 version", wt2Lock.getVersion(), wt4Lock.getVersion()); // Kind of done testing at this point, but let's finish t4 anyway - tq.notify(t4, t4.run(tb)); + tq.notify(t4, t4.run(tb.build(t4))); Assert.assertNull("null poll #4", tq.poll()); } @Test public void testBootstrap() throws Exception { - final TaskStorage storage = new LocalTaskStorage(); + final TaskStorage storage = new HeapMemoryTaskStorage(); final TaskLockbox lockbox = new TaskLockbox(storage); storage.insert(newTask("T1", "G1", "bar", new Interval("2011-01-01/P1D")), TaskStatus.running("T1")); @@ -374,7 +375,7 @@ public class TaskQueueTest @Override public TaskStatus run(TaskToolbox toolbox) throws Exception { - toolbox.getTaskActionClient().submit(new SpawnTasksAction(this, nextTasks)); + toolbox.getTaskActionClient().submit(new SpawnTasksAction(nextTasks)); return TaskStatus.success(id); } }; diff --git a/pom.xml b/pom.xml index 3c8f6e76b18..59a91567adb 100644 --- a/pom.xml +++ b/pom.xml @@ -165,17 +165,17 @@ com.fasterxml.jackson.core jackson-annotations - 2.1.2 + 2.1.4 com.fasterxml.jackson.core jackson-core - 2.1.3 + 2.1.4 com.fasterxml.jackson.core jackson-databind - 2.1.4-mmx-2 + 2.1.4 com.fasterxml.jackson.datatype @@ -190,12 +190,12 @@ com.fasterxml.jackson.dataformat jackson-dataformat-smile - 2.1.3 + 2.1.4 com.fasterxml.jackson.jaxrs jackson-jaxrs-json-provider - 2.1.3 + 2.1.4 org.codehaus.jackson diff --git a/realtime/src/main/java/com/metamx/druid/realtime/RealtimeNode.java b/realtime/src/main/java/com/metamx/druid/realtime/RealtimeNode.java index 087e87d107f..fb47abab945 100644 --- a/realtime/src/main/java/com/metamx/druid/realtime/RealtimeNode.java +++ b/realtime/src/main/java/com/metamx/druid/realtime/RealtimeNode.java @@ -45,25 +45,15 @@ import com.metamx.druid.db.DbConnectorConfig; import com.metamx.druid.http.QueryServlet; import com.metamx.druid.http.StatusServlet; import com.metamx.druid.initialization.Initialization; +import com.metamx.druid.initialization.ServerInit; import com.metamx.druid.jackson.DefaultObjectMapper; import com.metamx.druid.loading.DataSegmentPusher; -import com.metamx.druid.loading.LocalDataSegmentPusher; -import com.metamx.druid.loading.LocalDataSegmentPusherConfig; -import com.metamx.druid.loading.S3DataSegmentPusher; -import com.metamx.druid.loading.S3DataSegmentPusherConfig; import com.metamx.druid.query.QueryRunnerFactoryConglomerate; import com.metamx.druid.utils.PropUtils; import com.metamx.emitter.service.ServiceEmitter; import com.metamx.metrics.Monitor; - - - - -import org.jets3t.service.S3ServiceException; -import org.jets3t.service.impl.rest.httpclient.RestS3Service; -import org.jets3t.service.security.AWSCredentials; import org.mortbay.jetty.servlet.Context; import org.mortbay.jetty.servlet.ServletHolder; import org.skife.config.ConfigurationObjectFactory; @@ -259,31 +249,7 @@ public class RealtimeNode extends BaseServerNode private void initializeSegmentPusher() { if (dataSegmentPusher == null) { - final Properties props = getProps(); - if (Boolean.parseBoolean(props.getProperty("druid.pusher.local", "false"))) { - dataSegmentPusher = new LocalDataSegmentPusher( - getConfigFactory().build(LocalDataSegmentPusherConfig.class), getJsonMapper() - ); - } - else { - - final RestS3Service s3Client; - try { - s3Client = new RestS3Service( - new AWSCredentials( - PropUtils.getProperty(props, "com.metamx.aws.accessKey"), - PropUtils.getProperty(props, "com.metamx.aws.secretKey") - ) - ); - } - catch (S3ServiceException e) { - throw Throwables.propagate(e); - } - - dataSegmentPusher = new S3DataSegmentPusher( - s3Client, getConfigFactory().build(S3DataSegmentPusherConfig.class), getJsonMapper() - ); - } + dataSegmentPusher = ServerInit.getSegmentPusher(getProps(), getConfigFactory(), getJsonMapper()); } } diff --git a/server/src/main/java/com/metamx/druid/initialization/ServerInit.java b/server/src/main/java/com/metamx/druid/initialization/ServerInit.java index 7cd6caf3c1b..cca4f910b77 100644 --- a/server/src/main/java/com/metamx/druid/initialization/ServerInit.java +++ b/server/src/main/java/com/metamx/druid/initialization/ServerInit.java @@ -19,17 +19,24 @@ package com.metamx.druid.initialization; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Supplier; +import com.google.common.base.Throwables; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; import com.metamx.common.ISE; import com.metamx.common.logger.Logger; import com.metamx.druid.DruidProcessingConfig; +import com.metamx.druid.loading.DataSegmentPusher; import com.metamx.druid.loading.DelegatingSegmentLoader; import com.metamx.druid.loading.LocalDataSegmentPuller; +import com.metamx.druid.loading.LocalDataSegmentPusher; +import com.metamx.druid.loading.LocalDataSegmentPusherConfig; import com.metamx.druid.loading.MMappedQueryableIndexFactory; import com.metamx.druid.loading.QueryableIndexFactory; import com.metamx.druid.loading.S3DataSegmentPuller; +import com.metamx.druid.loading.S3DataSegmentPusher; +import com.metamx.druid.loading.S3DataSegmentPusherConfig; import com.metamx.druid.loading.SegmentLoaderConfig; import com.metamx.druid.loading.SingleSegmentLoader; import com.metamx.druid.query.group.GroupByQueryEngine; @@ -48,12 +55,16 @@ import com.metamx.druid.query.timeboundary.TimeBoundaryQuery; import com.metamx.druid.query.timeboundary.TimeBoundaryQueryRunnerFactory; import com.metamx.druid.query.timeseries.TimeseriesQuery; import com.metamx.druid.query.timeseries.TimeseriesQueryRunnerFactory; +import com.metamx.druid.utils.PropUtils; +import org.jets3t.service.S3ServiceException; import org.jets3t.service.impl.rest.httpclient.RestS3Service; +import org.jets3t.service.security.AWSCredentials; import org.skife.config.ConfigurationObjectFactory; import java.lang.reflect.InvocationTargetException; import java.nio.ByteBuffer; import java.util.Map; +import java.util.Properties; import java.util.concurrent.atomic.AtomicLong; /** @@ -145,6 +156,34 @@ public class ServerInit return queryRunners; } + public static DataSegmentPusher getSegmentPusher( + final Properties props, + final ConfigurationObjectFactory configFactory, + final ObjectMapper jsonMapper + ) + { + if (Boolean.parseBoolean(props.getProperty("druid.pusher.local", "false"))) { + return new LocalDataSegmentPusher(configFactory.build(LocalDataSegmentPusherConfig.class), jsonMapper); + } + else { + + final RestS3Service s3Client; + try { + s3Client = new RestS3Service( + new AWSCredentials( + PropUtils.getProperty(props, "com.metamx.aws.accessKey"), + PropUtils.getProperty(props, "com.metamx.aws.secretKey") + ) + ); + } + catch (S3ServiceException e) { + throw Throwables.propagate(e); + } + + return new S3DataSegmentPusher(s3Client, configFactory.build(S3DataSegmentPusherConfig.class), jsonMapper); + } + } + private static class ComputeScratchPool extends StupidPool { private static final Logger log = new Logger(ComputeScratchPool.class);