diff --git a/merger/src/main/java/com/metamx/druid/merger/common/task/RealtimeIndexTask.java b/merger/src/main/java/com/metamx/druid/merger/common/task/RealtimeIndexTask.java index 8080bb3e6db..b7c9d9305fd 100644 --- a/merger/src/main/java/com/metamx/druid/merger/common/task/RealtimeIndexTask.java +++ b/merger/src/main/java/com/metamx/druid/merger/common/task/RealtimeIndexTask.java @@ -40,6 +40,8 @@ import java.io.IOException; public class RealtimeIndexTask extends AbstractTask { + private static final EmittingLogger log = new EmittingLogger(RealtimeIndexTask.class); + @JsonIgnore final Schema schema; @@ -67,8 +69,6 @@ public class RealtimeIndexTask extends AbstractTask @JsonIgnore private volatile boolean shutdown = false; - private static final EmittingLogger log = new EmittingLogger(RealtimeIndexTask.class); - @JsonCreator public RealtimeIndexTask( @JsonProperty("id") String id, diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/ForkingTaskRunner.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/ForkingTaskRunner.java index f480ea4ba6c..80fa5871768 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/ForkingTaskRunner.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/ForkingTaskRunner.java @@ -1,3 +1,22 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + package com.metamx.druid.merger.coordinator; import com.fasterxml.jackson.databind.ObjectMapper; diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/RemoteTaskRunner.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/RemoteTaskRunner.java index 80e382bdd38..42957be4765 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/RemoteTaskRunner.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/RemoteTaskRunner.java @@ -19,7 +19,6 @@ package com.metamx.druid.merger.coordinator; -import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Charsets; import com.google.common.base.Joiner; @@ -54,8 +53,7 @@ import org.apache.zookeeper.CreateMode; import org.joda.time.DateTime; import org.joda.time.Duration; -import java.io.IOException; -import java.net.URI; +import java.net.MalformedURLException; import java.net.URL; import java.util.Collection; import java.util.Comparator; @@ -82,10 +80,13 @@ import java.util.concurrent.atomic.AtomicReference; *
* If a worker node becomes inexplicably disconnected from Zk, the RemoteTaskRunner will automatically retry any tasks * that were associated with the node. + * + * The RemoteTaskRunner uses ZK for job management and assignment and http for IPC messages. */ public class RemoteTaskRunner implements TaskRunner { private static final EmittingLogger log = new EmittingLogger(RemoteTaskRunner.class); + private static final ToStringResponseHandler responseHandler = new ToStringResponseHandler(Charsets.UTF_8); private static final Joiner JOINER = Joiner.on("/"); private final ObjectMapper jsonMapper; @@ -239,6 +240,10 @@ public class RemoteTaskRunner implements TaskRunner return taskRunnerWorkItem.getResult(); } + /** + * Finds the worker running the task and forwards the shutdown signal to the worker. + * @param taskId + */ @Override public void shutdown(String taskId) { @@ -249,20 +254,19 @@ public class RemoteTaskRunner implements TaskRunner return; } - final RetryPolicy retryPolicy = retryPolicyFactory.makeRetryPolicy(); + final RetryPolicy shutdownRetryPolicy = retryPolicyFactory.makeRetryPolicy(); URL url; try { - url = new URL(String.format("http://%s/mmx/v1/worker/shutdown", zkWorker.getWorker().getHost())); + url = new URL(String.format("http://%s/mmx/v1/worker/task/%s/shutdown", zkWorker.getWorker().getHost(), taskId)); } - catch (Exception e) { + catch (MalformedURLException e) { throw Throwables.propagate(e); } - while (!retryPolicy.hasExceededRetryThreshold()) { + while (!shutdownRetryPolicy.hasExceededRetryThreshold()) { try { final String response = httpClient.post(url) - .setContent("application/json", jsonMapper.writeValueAsBytes(taskId)) - .go(new ToStringResponseHandler(Charsets.UTF_8)) + .go(responseHandler) .get(); log.info("Sent shutdown message to worker: %s, response: %s", zkWorker.getWorker().getHost(), response); @@ -271,11 +275,11 @@ public class RemoteTaskRunner implements TaskRunner catch (Exception e) { log.error(e, "Exception shutting down taskId: %s", taskId); - if (retryPolicy.hasExceededRetryThreshold()) { + if (shutdownRetryPolicy.hasExceededRetryThreshold()) { throw Throwables.propagate(e); } else { try { - final long sleepTime = retryPolicy.getAndIncrementRetryDelay().getMillis(); + final long sleepTime = shutdownRetryPolicy.getAndIncrementRetryDelay().getMillis(); log.info("Will try again in %s.", new Duration(sleepTime).toString()); Thread.sleep(sleepTime); } @@ -287,6 +291,10 @@ public class RemoteTaskRunner implements TaskRunner } } + /** + * Adds a task to the pending queue + * @param taskRunnerWorkItem + */ private void addPendingTask(final TaskRunnerWorkItem taskRunnerWorkItem) { log.info("Added pending task %s", taskRunnerWorkItem.getTask().getId()); diff --git a/merger/src/main/java/com/metamx/druid/merger/worker/http/WorkerResource.java b/merger/src/main/java/com/metamx/druid/merger/worker/http/WorkerResource.java index 02adeb427b0..9c3617fb018 100644 --- a/merger/src/main/java/com/metamx/druid/merger/worker/http/WorkerResource.java +++ b/merger/src/main/java/com/metamx/druid/merger/worker/http/WorkerResource.java @@ -1,13 +1,33 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + package com.metamx.druid.merger.worker.http; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableMap; import com.google.inject.Inject; import com.metamx.common.logger.Logger; import com.metamx.druid.merger.coordinator.ForkingTaskRunner; -import javax.ws.rs.Consumes; import javax.ws.rs.POST; import javax.ws.rs.Path; +import javax.ws.rs.PathParam; import javax.ws.rs.Produces; import javax.ws.rs.core.Response; @@ -33,17 +53,16 @@ public class WorkerResource } @POST - @Path("/shutdown") - @Consumes("application/json") + @Path("/task/{taskid}/shutdown") @Produces("application/json") - public Response doShutdown(final String taskId) + public Response doShutdown(@PathParam("taskid") String taskid) { try { - taskRunner.shutdown(taskId); + taskRunner.shutdown(taskid); } catch (Exception e) { return Response.serverError().build(); } - return Response.ok().build(); + return Response.ok(ImmutableMap.of("task", taskid)).build(); } } diff --git a/realtime/src/main/java/com/metamx/druid/realtime/GracefulShutdownFirehose.java b/realtime/src/main/java/com/metamx/druid/realtime/GracefulShutdownFirehose.java index 23c7122be38..05aff8e7c8c 100644 --- a/realtime/src/main/java/com/metamx/druid/realtime/GracefulShutdownFirehose.java +++ b/realtime/src/main/java/com/metamx/druid/realtime/GracefulShutdownFirehose.java @@ -1,3 +1,22 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + package com.metamx.druid.realtime; import com.google.common.base.Throwables; @@ -8,7 +27,6 @@ import com.metamx.druid.index.v1.IndexGranularity; import com.metamx.druid.input.InputRow; import com.metamx.druid.realtime.plumber.IntervalRejectionPolicyFactory; import com.metamx.druid.realtime.plumber.RejectionPolicy; -import org.apache.commons.lang.mutable.MutableBoolean; import org.joda.time.DateTime; import org.joda.time.Duration; import org.joda.time.Interval; @@ -32,8 +50,11 @@ public class GracefulShutdownFirehose implements Firehose private final ScheduledExecutorService scheduledExecutor; private final RejectionPolicy rejectionPolicy; - private final AtomicBoolean hasMore = new AtomicBoolean(true); - private volatile boolean shutdown = false; + // when this is set to false, the firehose will have no more rows + private final AtomicBoolean valveOn = new AtomicBoolean(true); + + // when this is set to true, the firehose will begin rejecting events + private volatile boolean beginRejectionPolicy = false; public GracefulShutdownFirehose( Firehose firehose, @@ -64,7 +85,7 @@ public class GracefulShutdownFirehose implements Firehose final long end = segmentGranularity.increment(truncatedNow) + windowMillis; final Duration timeUntilShutdown = new Duration(System.currentTimeMillis(), end); - log.info("Shutting down in %s. Time at shutdown: ~%s", timeUntilShutdown, new DateTime(end)); + log.info("Shutdown at approx. %s (in %s)", new DateTime(end), timeUntilShutdown); ScheduledExecutors.scheduleWithFixedDelay( scheduledExecutor, @@ -75,7 +96,7 @@ public class GracefulShutdownFirehose implements Firehose public ScheduledExecutors.Signal call() throws Exception { try { - hasMore.set(false); + valveOn.set(false); } catch (Exception e) { throw Throwables.propagate(e); @@ -86,13 +107,13 @@ public class GracefulShutdownFirehose implements Firehose } ); - shutdown = true; + beginRejectionPolicy = true; } @Override public boolean hasMore() { - return hasMore.get() && firehose.hasMore(); + return valveOn.get() && firehose.hasMore(); } @Override @@ -100,7 +121,7 @@ public class GracefulShutdownFirehose implements Firehose { InputRow next = firehose.nextRow(); - if (!shutdown || rejectionPolicy.accept(next.getTimestampFromEpoch())) { + if (!beginRejectionPolicy || rejectionPolicy.accept(next.getTimestampFromEpoch())) { return next; } diff --git a/realtime/src/main/java/com/metamx/druid/realtime/plumber/RealtimePlumberSchool.java b/realtime/src/main/java/com/metamx/druid/realtime/plumber/RealtimePlumberSchool.java index fee103c57dc..287f313fce9 100644 --- a/realtime/src/main/java/com/metamx/druid/realtime/plumber/RealtimePlumberSchool.java +++ b/realtime/src/main/java/com/metamx/druid/realtime/plumber/RealtimePlumberSchool.java @@ -91,6 +91,8 @@ public class RealtimePlumberSchool implements PlumberSchool private final IndexGranularity segmentGranularity; private final Object handoffCondition = new Object(); + private ServiceEmitter emitter; + private volatile VersioningPolicy versioningPolicy = null; private volatile RejectionPolicyFactory rejectionPolicyFactory = null; private volatile QueryRunnerFactoryConglomerate conglomerate = null; @@ -98,7 +100,6 @@ public class RealtimePlumberSchool implements PlumberSchool private volatile SegmentAnnouncer segmentAnnouncer = null; private volatile SegmentPublisher segmentPublisher = null; private volatile ServerView serverView = null; - private ServiceEmitter emitter; @JsonCreator public RealtimePlumberSchool( @@ -310,7 +311,7 @@ public class RealtimePlumberSchool implements PlumberSchool while (!sinks.isEmpty()) { try { log.info( - "Cannot shut down yet! Sinks remain: %s", + "Cannot shut down yet! Sinks remaining: %s", Joiner.on(", ").join( Iterables.transform( sinks.values(), @@ -337,7 +338,6 @@ public class RealtimePlumberSchool implements PlumberSchool // scheduledExecutor is shutdown here, but persistExecutor is shutdown when the // ServerView sends it a new segment callback - if (scheduledExecutor != null) { scheduledExecutor.shutdown(); }