diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/RemoteTaskRunner.java b/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/RemoteTaskRunner.java index 0c5ec760c7e..08157d2ab43 100644 --- a/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/RemoteTaskRunner.java +++ b/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/RemoteTaskRunner.java @@ -41,8 +41,8 @@ import com.metamx.common.lifecycle.LifecycleStop; import com.metamx.druid.indexing.common.RetryPolicy; import com.metamx.druid.indexing.common.RetryPolicyFactory; import com.metamx.druid.indexing.common.TaskStatus; -import com.metamx.druid.indexing.common.tasklogs.TaskLogProvider; import com.metamx.druid.indexing.common.task.Task; +import com.metamx.druid.indexing.common.tasklogs.TaskLogProvider; import com.metamx.druid.indexing.coordinator.config.RemoteTaskRunnerConfig; import com.metamx.druid.indexing.coordinator.setup.WorkerSetupData; import com.metamx.druid.indexing.worker.Worker; @@ -164,14 +164,14 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider event.getData().getData(), Worker.class ); - log.info("New worker[%s] found!", worker.getHost()); + log.info("Worker[%s] reportin' for duty!", worker.getHost()); addWorker(worker); } else if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_REMOVED)) { final Worker worker = jsonMapper.readValue( event.getData().getData(), Worker.class ); - log.info("Worker[%s] removed!", worker.getHost()); + log.info("Kaboom! Worker[%s] removed!", worker.getHost()); removeWorker(worker); } } @@ -352,7 +352,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider Preconditions.checkArgument(path.startsWith("/"), "path must start with '/': %s", path); try { - return new URL(String.format("http://%s/mmx/worker/v1%s", worker.getHost(), path)); + return new URL(String.format("http://%s/druid/worker/v1%s", worker.getHost(), path)); } catch (MalformedURLException e) { throw Throwables.propagate(e); @@ -555,7 +555,8 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider final PathChildrenCache statusCache = new PathChildrenCache(cf, workerStatusPath, true); final ZkWorker zkWorker = new ZkWorker( worker, - statusCache + statusCache, + jsonMapper ); // Add status listener to the watcher for status changes @@ -598,8 +599,6 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider worker.getHost(), taskId ); - } else { - zkWorker.addTask(taskRunnerWorkItem); } if (taskStatus.isComplete()) { @@ -608,7 +607,6 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider if (result != null) { ((SettableFuture) result).set(taskStatus); } - zkWorker.removeTask(taskRunnerWorkItem); } // Worker is done with this task @@ -621,7 +619,6 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider TaskRunnerWorkItem taskRunnerWorkItem = runningTasks.get(taskId); if (taskRunnerWorkItem != null) { log.info("Task %s just disappeared!", taskId); - zkWorker.removeTask(taskRunnerWorkItem); retryTask(taskRunnerWorkItem); } } @@ -711,12 +708,17 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider @Override public boolean apply(ZkWorker input) { + for (String taskId : input.getRunningTasks()) { + TaskRunnerWorkItem workerTask = runningTasks.get(taskId); + if (workerTask != null && task.getAvailabilityGroup() + .equalsIgnoreCase(workerTask.getTask().getAvailabilityGroup())) { + return false; + } + } return (!input.isAtCapacity() && input.getWorker() .getVersion() - .compareTo(workerSetupData.get().getMinVersion()) >= 0 && - !input.getAvailabilityGroups().contains(task.getAvailabilityGroup()) - ); + .compareTo(workerSetupData.get().getMinVersion()) >= 0); } } ) diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/ZkWorker.java b/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/ZkWorker.java index 41cda473986..16b0f57f5c5 100644 --- a/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/ZkWorker.java +++ b/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/ZkWorker.java @@ -20,11 +20,19 @@ package com.metamx.druid.indexing.coordinator; import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Function; +import com.google.common.base.Throwables; +import com.google.common.collect.Lists; import com.google.common.collect.Sets; +import com.metamx.druid.indexing.common.TaskStatus; import com.metamx.druid.indexing.worker.Worker; +import org.apache.curator.framework.recipes.cache.ChildData; import org.apache.curator.framework.recipes.cache.PathChildrenCache; + import org.joda.time.DateTime; +import javax.annotation.Nullable; import java.io.Closeable; import java.io.IOException; import java.util.Set; @@ -36,17 +44,27 @@ public class ZkWorker implements Closeable { private final Worker worker; private final PathChildrenCache statusCache; - private final Set runningTasks; - private final Set availabilityGroups; + private final Function cacheConverter; private volatile DateTime lastCompletedTaskTime = new DateTime(); - public ZkWorker(Worker worker, PathChildrenCache statusCache) + public ZkWorker(Worker worker, PathChildrenCache statusCache, final ObjectMapper jsonMapper) { this.worker = worker; this.statusCache = statusCache; - this.runningTasks = Sets.newHashSet(); - this.availabilityGroups = Sets.newHashSet(); + this.cacheConverter = new Function() + { + @Override + public String apply(@Nullable ChildData input) + { + try { + return jsonMapper.readValue(input.getData(), TaskStatus.class).getId(); + } + catch (Exception e) { + throw Throwables.propagate(e); + } + } + }; } @JsonProperty @@ -58,13 +76,12 @@ public class ZkWorker implements Closeable @JsonProperty public Set getRunningTasks() { - return runningTasks; - } - - @JsonProperty - public Set getAvailabilityGroups() - { - return availabilityGroups; + return Sets.newHashSet( + Lists.transform( + statusCache.getCurrentData(), + cacheConverter + ) + ); } @JsonProperty @@ -76,7 +93,7 @@ public class ZkWorker implements Closeable @JsonProperty public boolean isAtCapacity() { - return runningTasks.size() >= worker.getCapacity(); + return statusCache.getCurrentData().size() >= worker.getCapacity(); } public void setLastCompletedTaskTime(DateTime completedTaskTime) @@ -84,18 +101,6 @@ public class ZkWorker implements Closeable lastCompletedTaskTime = completedTaskTime; } - public void addTask(TaskRunnerWorkItem item) - { - runningTasks.add(item.getTask().getId()); - availabilityGroups.add(item.getTask().getAvailabilityGroup()); - } - - public void removeTask(TaskRunnerWorkItem item) - { - runningTasks.remove(item.getTask().getId()); - availabilityGroups.remove(item.getTask().getAvailabilityGroup()); - } - @Override public void close() throws IOException { @@ -107,8 +112,6 @@ public class ZkWorker implements Closeable { return "ZkWorker{" + "worker=" + worker + - ", runningTasks=" + runningTasks + - ", availabilityGroups=" + availabilityGroups + ", lastCompletedTaskTime=" + lastCompletedTaskTime + '}'; } diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/http/IndexerCoordinatorNode.java b/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/http/IndexerCoordinatorNode.java index 77a9a1f036e..69a9d4dac88 100644 --- a/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/http/IndexerCoordinatorNode.java +++ b/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/http/IndexerCoordinatorNode.java @@ -311,7 +311,8 @@ public class IndexerCoordinatorNode extends QueryableNode * The monitor implements {@link QuerySegmentWalker} so tasks can offer up queryable data. This is useful for * realtime index tasks. */ @@ -96,7 +96,10 @@ public class WorkerTaskMonitor ); if (isTaskRunning(task)) { - log.warn("Got task %s that I am already running...", task.getId()); + log.warn( + "I can't build it. There's something in the way. Got task %s that I am already running...", + task.getId() + ); workerCuratorCoordinator.unannounceTask(task.getId()); return; } @@ -109,7 +112,7 @@ public class WorkerTaskMonitor { final long startTime = System.currentTimeMillis(); - log.info("Running task [%s]", task.getId()); + log.info("Affirmative. Running task [%s]", task.getId()); running.add(task); TaskStatus taskStatus; @@ -119,11 +122,12 @@ public class WorkerTaskMonitor taskStatus = taskRunner.run(task).get(); } catch (Exception e) { - log.makeAlert(e, "Failed to run task") + log.makeAlert(e, "I can't build there. Failed to run task") .addData("task", task.getId()) .emit(); taskStatus = TaskStatus.failure(task.getId()); - } finally { + } + finally { running.remove(task); } @@ -131,7 +135,11 @@ public class WorkerTaskMonitor try { workerCuratorCoordinator.updateStatus(taskStatus); - log.info("Completed task [%s] with status [%s]", task.getId(), taskStatus.getStatusCode()); + log.info( + "Job's finished. Completed [%s] with status [%s]", + task.getId(), + taskStatus.getStatusCode() + ); } catch (Exception e) { log.makeAlert(e, "Failed to update task status") diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/worker/executor/ChatHandlerResource.java b/indexing-service/src/main/java/com/metamx/druid/indexing/worker/executor/ChatHandlerResource.java index b71056318a1..05d459d1d3a 100644 --- a/indexing-service/src/main/java/com/metamx/druid/indexing/worker/executor/ChatHandlerResource.java +++ b/indexing-service/src/main/java/com/metamx/druid/indexing/worker/executor/ChatHandlerResource.java @@ -10,7 +10,7 @@ import javax.ws.rs.Path; import javax.ws.rs.PathParam; import javax.ws.rs.core.Response; -@Path("/mmx/worker/v1") +@Path("/druid/worker/v1") public class ChatHandlerResource { private final ObjectMapper jsonMapper; diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/worker/executor/ExecutorNode.java b/indexing-service/src/main/java/com/metamx/druid/indexing/worker/executor/ExecutorNode.java index 0f1ed382b50..06c160106d5 100644 --- a/indexing-service/src/main/java/com/metamx/druid/indexing/worker/executor/ExecutorNode.java +++ b/indexing-service/src/main/java/com/metamx/druid/indexing/worker/executor/ExecutorNode.java @@ -220,7 +220,7 @@ public class ExecutorNode extends BaseServerNode root.addServlet(new ServletHolder(new StatusServlet()), "/status"); root.addServlet(new ServletHolder(new DefaultServlet()), "/*"); root.addEventListener(new GuiceServletConfig(injector)); - root.addFilter(GuiceFilter.class, "/mmx/worker/v1/*", 0); + root.addFilter(GuiceFilter.class, "/druid/worker/v1/*", 0); root.addServlet( new ServletHolder( new QueryServlet(getJsonMapper(), getSmileMapper(), taskRunner, emitter, getRequestLogger()) diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/worker/http/WorkerNode.java b/indexing-service/src/main/java/com/metamx/druid/indexing/worker/http/WorkerNode.java index 3c98b41cf1e..1ccf32e0795 100644 --- a/indexing-service/src/main/java/com/metamx/druid/indexing/worker/http/WorkerNode.java +++ b/indexing-service/src/main/java/com/metamx/druid/indexing/worker/http/WorkerNode.java @@ -209,7 +209,7 @@ public class WorkerNode extends QueryableNode root.addServlet(new ServletHolder(new StatusServlet()), "/status"); root.addServlet(new ServletHolder(new DefaultServlet()), "/*"); root.addEventListener(new GuiceServletConfig(injector)); - root.addFilter(GuiceFilter.class, "/mmx/worker/v1/*", 0); + root.addFilter(GuiceFilter.class, "/druid/worker/v1/*", 0); } @LifecycleStart diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/worker/http/WorkerResource.java b/indexing-service/src/main/java/com/metamx/druid/indexing/worker/http/WorkerResource.java index 21c3a630fb5..5dc5301160e 100644 --- a/indexing-service/src/main/java/com/metamx/druid/indexing/worker/http/WorkerResource.java +++ b/indexing-service/src/main/java/com/metamx/druid/indexing/worker/http/WorkerResource.java @@ -39,7 +39,7 @@ import java.io.InputStream; /** */ -@Path("/mmx/worker/v1") +@Path("/druid/worker/v1") public class WorkerResource { private static final Logger log = new Logger(WorkerResource.class); diff --git a/indexing-service/src/test/java/com/metamx/druid/indexing/coordinator/RemoteTaskRunnerTest.java b/indexing-service/src/test/java/com/metamx/druid/indexing/coordinator/RemoteTaskRunnerTest.java index 9b0d879e28d..0a11fcb49fb 100644 --- a/indexing-service/src/test/java/com/metamx/druid/indexing/coordinator/RemoteTaskRunnerTest.java +++ b/indexing-service/src/test/java/com/metamx/druid/indexing/coordinator/RemoteTaskRunnerTest.java @@ -10,7 +10,7 @@ import com.google.common.util.concurrent.Futures; import com.metamx.common.ISE; import com.metamx.druid.aggregation.AggregatorFactory; import com.metamx.druid.client.DataSegment; -import com.metamx.druid.jackson.DefaultObjectMapper; +import com.metamx.druid.curator.PotentiallyGzippedCompressionProvider; import com.metamx.druid.indexing.TestTask; import com.metamx.druid.indexing.common.RetryPolicyFactory; import com.metamx.druid.indexing.common.TaskStatus; @@ -23,14 +23,15 @@ import com.metamx.druid.indexing.coordinator.setup.WorkerSetupData; import com.metamx.druid.indexing.worker.Worker; import com.metamx.druid.indexing.worker.WorkerCuratorCoordinator; import com.metamx.druid.indexing.worker.WorkerTaskMonitor; +import com.metamx.druid.jackson.DefaultObjectMapper; import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.service.ServiceEmitter; +import org.apache.commons.lang.mutable.MutableBoolean; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.cache.PathChildrenCache; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.curator.test.TestingCluster; -import org.apache.commons.lang.mutable.MutableBoolean; import org.apache.zookeeper.CreateMode; import org.easymock.EasyMock; import org.joda.time.DateTime; @@ -42,7 +43,6 @@ import org.junit.Before; import org.junit.Test; import java.io.File; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -81,6 +81,7 @@ public class RemoteTaskRunnerTest cf = CuratorFrameworkFactory.builder() .connectString(testingCluster.getConnectString()) .retryPolicy(new ExponentialBackoffRetry(1, 10)) + .compressionProvider(new PotentiallyGzippedCompressionProvider(false)) .build(); cf.start(); diff --git a/indexing-service/src/test/java/com/metamx/druid/indexing/coordinator/scaling/SimpleResourceManagementStrategyTest.java b/indexing-service/src/test/java/com/metamx/druid/indexing/coordinator/scaling/SimpleResourceManagementStrategyTest.java index 0a4b56fffcd..c44d555f798 100644 --- a/indexing-service/src/test/java/com/metamx/druid/indexing/coordinator/scaling/SimpleResourceManagementStrategyTest.java +++ b/indexing-service/src/test/java/com/metamx/druid/indexing/coordinator/scaling/SimpleResourceManagementStrategyTest.java @@ -334,7 +334,7 @@ public class SimpleResourceManagementStrategyTest Task testTask ) { - super(new Worker("host", "ip", 3, "version"), null); + super(new Worker("host", "ip", 3, "version"), null, null); this.testTask = testTask; }