diff --git a/api/pom.xml b/api/pom.xml
index 0e8333e2770..74a6f862f99 100644
--- a/api/pom.xml
+++ b/api/pom.xml
@@ -140,6 +140,17 @@
org.apache.maven.plugins
maven-release-plugin
+
+ org.apache.maven.plugins
+ maven-jar-plugin
+
+
+
+ test-jar
+
+
+
+
diff --git a/api/src/main/java/io/druid/segment/loading/NoopDataSegmentPusher.java b/api/src/main/java/io/druid/segment/loading/NoopDataSegmentPusher.java
new file mode 100644
index 00000000000..cfef672f075
--- /dev/null
+++ b/api/src/main/java/io/druid/segment/loading/NoopDataSegmentPusher.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.segment.loading;
+
+import com.google.common.collect.ImmutableMap;
+import io.druid.timeline.DataSegment;
+
+import java.io.File;
+import java.net.URI;
+import java.util.Map;
+
+/**
+ * Mostly used for test purpose.
+ */
+public class NoopDataSegmentPusher implements DataSegmentPusher
+{
+ @Override
+ public String getPathForHadoop()
+ {
+ return "noop";
+ }
+
+ @Deprecated
+ @Override
+ public String getPathForHadoop(String dataSource)
+ {
+ return getPathForHadoop();
+ }
+
+ @Override
+ public DataSegment push(File file, DataSegment segment, boolean replaceExisting)
+ {
+ return segment;
+ }
+
+ @Override
+ public Map makeLoadSpec(URI uri)
+ {
+ return ImmutableMap.of();
+ }
+}
diff --git a/api/src/test/java/io/druid/segment/loading/NoopDataSegmentArchiver.java b/api/src/test/java/io/druid/segment/loading/NoopDataSegmentArchiver.java
new file mode 100644
index 00000000000..f6960ba3149
--- /dev/null
+++ b/api/src/test/java/io/druid/segment/loading/NoopDataSegmentArchiver.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.segment.loading;
+
+import io.druid.timeline.DataSegment;
+
+import javax.annotation.Nullable;
+
+/**
+ * Mostly used for test purpose.
+ */
+public class NoopDataSegmentArchiver implements DataSegmentArchiver
+{
+ @Nullable
+ @Override
+ public DataSegment archive(DataSegment segment) throws SegmentLoadingException
+ {
+ return segment;
+ }
+
+ @Nullable
+ @Override
+ public DataSegment restore(DataSegment segment) throws SegmentLoadingException
+ {
+ return segment;
+ }
+}
diff --git a/api/src/test/java/io/druid/segment/loading/NoopDataSegmentKiller.java b/api/src/test/java/io/druid/segment/loading/NoopDataSegmentKiller.java
new file mode 100644
index 00000000000..2f114c2bd2f
--- /dev/null
+++ b/api/src/test/java/io/druid/segment/loading/NoopDataSegmentKiller.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.segment.loading;
+
+import io.druid.timeline.DataSegment;
+
+import java.io.IOException;
+
+/**
+ * Mostly used for test purpose.
+ */
+public class NoopDataSegmentKiller implements DataSegmentKiller
+{
+ @Override
+ public void kill(DataSegment segments) throws SegmentLoadingException
+ {
+ }
+
+ @Override
+ public void killAll() throws IOException
+ {
+ }
+}
diff --git a/api/src/test/java/io/druid/segment/loading/NoopDataSegmentMover.java b/api/src/test/java/io/druid/segment/loading/NoopDataSegmentMover.java
new file mode 100644
index 00000000000..e4c276f5559
--- /dev/null
+++ b/api/src/test/java/io/druid/segment/loading/NoopDataSegmentMover.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.segment.loading;
+
+import io.druid.timeline.DataSegment;
+
+import java.util.Map;
+
+/**
+ * Mostly used for test purpose.
+ */
+public class NoopDataSegmentMover implements DataSegmentMover
+{
+ @Override
+ public DataSegment move(
+ DataSegment segment, Map targetLoadSpec
+ )
+ {
+ return segment;
+ }
+}
diff --git a/indexing-service/pom.xml b/indexing-service/pom.xml
index cd15dc607b2..36d3b921f34 100644
--- a/indexing-service/pom.xml
+++ b/indexing-service/pom.xml
@@ -81,6 +81,13 @@
test-jar
test
+
+ io.druid
+ druid-api
+ ${project.parent.version}
+ test-jar
+ test
+
diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/ThreadPoolTaskRunner.java b/indexing-service/src/main/java/io/druid/indexing/overlord/SingleTaskBackgroundRunner.java
similarity index 70%
rename from indexing-service/src/main/java/io/druid/indexing/overlord/ThreadPoolTaskRunner.java
rename to indexing-service/src/main/java/io/druid/indexing/overlord/SingleTaskBackgroundRunner.java
index daad56a8797..14c8f8cdef6 100644
--- a/indexing-service/src/main/java/io/druid/indexing/overlord/ThreadPoolTaskRunner.java
+++ b/indexing-service/src/main/java/io/druid/indexing/overlord/SingleTaskBackgroundRunner.java
@@ -23,16 +23,10 @@ import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.inject.Inject;
-import io.druid.java.util.emitter.EmittingLogger;
-import io.druid.java.util.emitter.service.ServiceEmitter;
-import io.druid.java.util.emitter.service.ServiceMetricEvent;
-import io.druid.java.util.common.concurrent.Execs;
import io.druid.concurrent.TaskThreadPriority;
import io.druid.guice.annotations.Self;
import io.druid.indexer.TaskLocation;
@@ -44,8 +38,14 @@ import io.druid.indexing.common.task.Task;
import io.druid.indexing.overlord.autoscaling.ScalingStats;
import io.druid.java.util.common.DateTimes;
import io.druid.java.util.common.ISE;
+import io.druid.java.util.common.Numbers;
import io.druid.java.util.common.Pair;
+import io.druid.java.util.common.concurrent.Execs;
+import io.druid.java.util.common.lifecycle.LifecycleStart;
import io.druid.java.util.common.lifecycle.LifecycleStop;
+import io.druid.java.util.emitter.EmittingLogger;
+import io.druid.java.util.emitter.service.ServiceEmitter;
+import io.druid.java.util.emitter.service.ServiceMetricEvent;
import io.druid.query.NoopQueryRunner;
import io.druid.query.Query;
import io.druid.query.QueryRunner;
@@ -57,40 +57,35 @@ import io.druid.server.initialization.ServerConfig;
import org.joda.time.Interval;
import java.util.Collection;
-import java.util.Comparator;
+import java.util.Collections;
import java.util.List;
-import java.util.Map;
-import java.util.Set;
import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
/**
- * Runs tasks in a JVM thread using an ExecutorService.
+ * Runs a single task in a JVM thread using an ExecutorService.
*/
-public class ThreadPoolTaskRunner implements TaskRunner, QuerySegmentWalker
+public class SingleTaskBackgroundRunner implements TaskRunner, QuerySegmentWalker
{
- private static final EmittingLogger log = new EmittingLogger(ThreadPoolTaskRunner.class);
+ private static final EmittingLogger log = new EmittingLogger(SingleTaskBackgroundRunner.class);
private final TaskToolboxFactory toolboxFactory;
private final TaskConfig taskConfig;
- private final ConcurrentMap exec = new ConcurrentHashMap<>();
- private final Set runningItems = new ConcurrentSkipListSet<>(
- ThreadPoolTaskRunnerWorkItem.COMPARATOR
- );
- private final CopyOnWriteArrayList> listeners = new CopyOnWriteArrayList<>();
private final ServiceEmitter emitter;
private final TaskLocation location;
private final ServerConfig serverConfig;
- private volatile boolean stopping = false;
+ // Currently any listeners are registered in peons, but they might be used in the future.
+ private final CopyOnWriteArrayList> listeners = new CopyOnWriteArrayList<>();
+
+ private volatile ListeningExecutorService executorService;
+ private volatile SingleTaskBackgroundRunnerWorkItem runningItem;
+ private volatile boolean stopping;
@Inject
- public ThreadPoolTaskRunner(
+ public SingleTaskBackgroundRunner(
TaskToolboxFactory toolboxFactory,
TaskConfig taskConfig,
ServiceEmitter emitter,
@@ -108,7 +103,7 @@ public class ThreadPoolTaskRunner implements TaskRunner, QuerySegmentWalker
@Override
public List>> restore()
{
- return ImmutableList.of();
+ return Collections.emptyList();
}
@Override
@@ -127,8 +122,12 @@ public class ThreadPoolTaskRunner implements TaskRunner, QuerySegmentWalker
listeners.add(listenerPair);
log.info("Registered listener [%s]", listener.getListenerId());
- for (ThreadPoolTaskRunnerWorkItem item : runningItems) {
- TaskRunnerUtils.notifyLocationChanged(ImmutableList.of(listenerPair), item.getTaskId(), item.getLocation());
+ if (runningItem != null) {
+ TaskRunnerUtils.notifyLocationChanged(
+ ImmutableList.of(listenerPair),
+ runningItem.getTaskId(),
+ runningItem.getLocation()
+ );
}
}
@@ -154,23 +153,30 @@ public class ThreadPoolTaskRunner implements TaskRunner, QuerySegmentWalker
);
}
+ @Override
+ @LifecycleStart
+ public void start()
+ {
+ // No state startup required
+ }
+
@Override
@LifecycleStop
public void stop()
{
stopping = true;
- for (Map.Entry entry : exec.entrySet()) {
+ if (executorService != null) {
try {
- entry.getValue().shutdown();
+ executorService.shutdown();
}
catch (SecurityException ex) {
log.wtf(ex, "I can't control my own threads!");
}
}
- for (ThreadPoolTaskRunnerWorkItem item : runningItems) {
- final Task task = item.getTask();
+ if (runningItem != null) {
+ final Task task = runningItem.getTask();
final long start = System.currentTimeMillis();
final boolean graceful;
final long elapsed;
@@ -183,7 +189,7 @@ public class ThreadPoolTaskRunner implements TaskRunner, QuerySegmentWalker
try {
task.stopGracefully();
- final TaskStatus taskStatus = item.getResult().get(
+ final TaskStatus taskStatus = runningItem.getResult().get(
new Interval(DateTimes.utc(start), taskConfig.getGracefulShutdownTimeout()).toDurationMillis(),
TimeUnit.MILLISECONDS
);
@@ -225,9 +231,9 @@ public class ThreadPoolTaskRunner implements TaskRunner, QuerySegmentWalker
}
// Ok, now interrupt everything.
- for (Map.Entry entry : exec.entrySet()) {
+ if (executorService != null) {
try {
- entry.getValue().shutdownNow();
+ executorService.shutdownNow();
}
catch (SecurityException ex) {
log.wtf(ex, "I can't control my own threads!");
@@ -238,87 +244,63 @@ public class ThreadPoolTaskRunner implements TaskRunner, QuerySegmentWalker
@Override
public ListenableFuture run(final Task task)
{
- final TaskToolbox toolbox = toolboxFactory.build(task);
- final Object taskPriorityObj = task.getContextValue(TaskThreadPriority.CONTEXT_KEY);
- int taskPriority = 0;
- if (taskPriorityObj != null) {
- if (taskPriorityObj instanceof Number) {
- taskPriority = ((Number) taskPriorityObj).intValue();
- } else if (taskPriorityObj instanceof String) {
- try {
- taskPriority = Integer.parseInt(taskPriorityObj.toString());
- }
- catch (NumberFormatException e) {
- log.error(e, "Error parsing task priority [%s] for task [%s]", taskPriorityObj, task.getId());
- }
+ if (runningItem == null) {
+ final TaskToolbox toolbox = toolboxFactory.build(task);
+ final Object taskPriorityObj = task.getContextValue(TaskThreadPriority.CONTEXT_KEY);
+ int taskPriority = 0;
+ try {
+ taskPriority = taskPriorityObj == null ? 0 : Numbers.parseInt(taskPriorityObj);
}
- }
- // Ensure an executor for that priority exists
- if (!exec.containsKey(taskPriority)) {
- final ListeningExecutorService executorService = buildExecutorService(taskPriority);
- if (exec.putIfAbsent(taskPriority, executorService) != null) {
- // favor prior service
- executorService.shutdownNow();
+ catch (NumberFormatException e) {
+ log.error(e, "Error parsing task priority [%s] for task [%s]", taskPriorityObj, task.getId());
}
+ // Ensure an executor for that priority exists
+ executorService = buildExecutorService(taskPriority);
+ final ListenableFuture statusFuture = executorService.submit(
+ new SingleTaskBackgroundRunnerCallable(task, location, toolbox)
+ );
+ runningItem = new SingleTaskBackgroundRunnerWorkItem(
+ task,
+ location,
+ statusFuture
+ );
+
+ return statusFuture;
+ } else {
+ throw new ISE("Already running task[%s]", runningItem.getTask().getId());
}
- final ListenableFuture statusFuture = exec.get(taskPriority)
- .submit(new ThreadPoolTaskRunnerCallable(
- task,
- location,
- toolbox
- ));
- final ThreadPoolTaskRunnerWorkItem taskRunnerWorkItem = new ThreadPoolTaskRunnerWorkItem(
- task,
- location,
- statusFuture
- );
- runningItems.add(taskRunnerWorkItem);
- Futures.addCallback(
- statusFuture, new FutureCallback()
- {
- @Override
- public void onSuccess(TaskStatus result)
- {
- runningItems.remove(taskRunnerWorkItem);
- }
-
- @Override
- public void onFailure(Throwable t)
- {
- runningItems.remove(taskRunnerWorkItem);
- }
- }
- );
-
- return statusFuture;
}
+ /**
+ * There might be a race between {@link #run(Task)} and this method, but it shouldn't happen in real applications
+ * because this method is called only in unit tests. See TaskLifecycleTest.
+ *
+ * @param taskid task ID to clean up resources for
+ */
@Override
public void shutdown(final String taskid)
{
- for (final TaskRunnerWorkItem runningItem : runningItems) {
- if (runningItem.getTaskId().equals(taskid)) {
- runningItem.getResult().cancel(true);
- }
+ if (runningItem != null && runningItem.getTask().getId().equals(taskid)) {
+ runningItem.getResult().cancel(true);
}
}
@Override
public Collection getRunningTasks()
{
- return ImmutableList.copyOf(runningItems);
+ return runningItem == null ? Collections.emptyList() : Collections.singletonList(runningItem);
}
@Override
public Collection getPendingTasks()
{
- return ImmutableList.of();
+ return Collections.emptyList();
}
@Override
public Collection getKnownTasks()
{
- return ImmutableList.copyOf(runningItems);
+ return runningItem == null ? Collections.emptyList() : Collections.singletonList(runningItem);
}
@Override
@@ -327,12 +309,6 @@ public class ThreadPoolTaskRunner implements TaskRunner, QuerySegmentWalker
return Optional.absent();
}
- @Override
- public void start()
- {
- // No state startup required
- }
-
@Override
public QueryRunner getQueryRunnerForIntervals(Query query, Iterable intervals)
{
@@ -350,8 +326,8 @@ public class ThreadPoolTaskRunner implements TaskRunner, QuerySegmentWalker
QueryRunner queryRunner = null;
final String queryDataSource = Iterables.getOnlyElement(query.getDataSource().getNames());
- for (final ThreadPoolTaskRunnerWorkItem taskRunnerWorkItem : ImmutableList.copyOf(runningItems)) {
- final Task task = taskRunnerWorkItem.getTask();
+ if (runningItem != null) {
+ final Task task = runningItem.getTask();
if (task.getDataSource().equals(queryDataSource)) {
final QueryRunner taskQueryRunner = task.getQueryRunner(query);
@@ -367,30 +343,18 @@ public class ThreadPoolTaskRunner implements TaskRunner, QuerySegmentWalker
}
}
- return new SetAndVerifyContextQueryRunner(
+ return new SetAndVerifyContextQueryRunner<>(
serverConfig,
- queryRunner == null ? new NoopQueryRunner() : queryRunner
+ queryRunner == null ? new NoopQueryRunner<>() : queryRunner
);
}
- private static class ThreadPoolTaskRunnerWorkItem extends TaskRunnerWorkItem
+ private static class SingleTaskBackgroundRunnerWorkItem extends TaskRunnerWorkItem
{
- private static final Comparator COMPARATOR = new Comparator()
- {
- @Override
- public int compare(
- ThreadPoolTaskRunnerWorkItem lhs,
- ThreadPoolTaskRunnerWorkItem rhs
- )
- {
- return lhs.getTaskId().compareTo(rhs.getTaskId());
- }
- };
-
private final Task task;
private final TaskLocation location;
- private ThreadPoolTaskRunnerWorkItem(
+ private SingleTaskBackgroundRunnerWorkItem(
Task task,
TaskLocation location,
ListenableFuture result
@@ -425,13 +389,13 @@ public class ThreadPoolTaskRunner implements TaskRunner, QuerySegmentWalker
}
}
- private class ThreadPoolTaskRunnerCallable implements Callable
+ private class SingleTaskBackgroundRunnerCallable implements Callable
{
private final Task task;
private final TaskLocation location;
private final TaskToolbox toolbox;
- public ThreadPoolTaskRunnerCallable(Task task, TaskLocation location, TaskToolbox toolbox)
+ SingleTaskBackgroundRunnerCallable(Task task, TaskLocation location, TaskToolbox toolbox)
{
this.task = task;
this.location = location;
diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/TaskRunner.java b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskRunner.java
index e03ed5ebca3..5d0f2df7821 100644
--- a/indexing-service/src/main/java/io/druid/indexing/overlord/TaskRunner.java
+++ b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskRunner.java
@@ -44,6 +44,11 @@ public interface TaskRunner
*/
List>> restore();
+ /**
+ * Start the state of the runner.
+ */
+ void start();
+
/**
* Register a listener with this task runner. On registration, the listener will get events corresponding to the
* current state of known tasks.
@@ -95,12 +100,4 @@ public interface TaskRunner
* @return ScalingStats if the runner has an underlying resource which can scale, Optional.absent() otherwise
*/
Optional getScalingStats();
-
- /**
- * Start the state of the runner.
- *
- * This method is unused, but TaskRunner is {@link PublicApi}, so we cannot remove it.
- */
- @SuppressWarnings("unused")
- void start();
}
diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java
new file mode 100644
index 00000000000..301e18d0ef4
--- /dev/null
+++ b/indexing-service/src/test/java/io/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.indexing.overlord;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import io.druid.indexer.TaskState;
+import io.druid.indexing.common.SegmentLoaderFactory;
+import io.druid.indexing.common.TaskReportFileWriter;
+import io.druid.indexing.common.TaskStatus;
+import io.druid.indexing.common.TaskToolbox;
+import io.druid.indexing.common.TaskToolboxFactory;
+import io.druid.indexing.common.TestUtils;
+import io.druid.indexing.common.actions.TaskActionClient;
+import io.druid.indexing.common.actions.TaskActionClientFactory;
+import io.druid.indexing.common.config.TaskConfig;
+import io.druid.indexing.common.task.AbstractTask;
+import io.druid.indexing.common.task.NoopTask;
+import io.druid.java.util.emitter.service.ServiceEmitter;
+import io.druid.segment.loading.NoopDataSegmentArchiver;
+import io.druid.segment.loading.NoopDataSegmentKiller;
+import io.druid.segment.loading.NoopDataSegmentMover;
+import io.druid.segment.loading.NoopDataSegmentPusher;
+import io.druid.segment.loading.SegmentLoaderLocalCacheManager;
+import io.druid.server.DruidNode;
+import io.druid.server.coordination.NoopDataSegmentAnnouncer;
+import io.druid.server.initialization.ServerConfig;
+import io.druid.server.metrics.NoopServiceEmitter;
+import org.easymock.EasyMock;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+public class SingleTaskBackgroundRunnerTest
+{
+ @Rule
+ public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+ private SingleTaskBackgroundRunner runner;
+
+ @Before
+ public void setup() throws IOException
+ {
+ final TestUtils utils = new TestUtils();
+ final DruidNode node = new DruidNode("testServer", "testHost", 1000, null, true, false);
+ final TaskConfig taskConfig = new TaskConfig(
+ temporaryFolder.newFile().toString(),
+ null,
+ null,
+ 50000,
+ null,
+ true,
+ null,
+ null
+ );
+ final ServiceEmitter emitter = new NoopServiceEmitter();
+ final TaskToolboxFactory toolboxFactory = new TaskToolboxFactory(
+ taskConfig,
+ EasyMock.createMock(TaskActionClientFactory.class),
+ emitter,
+ new NoopDataSegmentPusher(),
+ new NoopDataSegmentKiller(),
+ new NoopDataSegmentMover(),
+ new NoopDataSegmentArchiver(),
+ new NoopDataSegmentAnnouncer(),
+ null,
+ null,
+ null,
+ null,
+ null,
+ new SegmentLoaderFactory(EasyMock.createMock(SegmentLoaderLocalCacheManager.class)),
+ utils.getTestObjectMapper(),
+ utils.getTestIndexIO(),
+ null,
+ null,
+ utils.getTestIndexMergerV9(),
+ null,
+ node,
+ null,
+ null,
+ new TaskReportFileWriter(new File("fake"))
+ );
+ runner = new SingleTaskBackgroundRunner(
+ toolboxFactory,
+ taskConfig,
+ emitter,
+ node,
+ new ServerConfig()
+ );
+ }
+
+ @After
+ public void teardown()
+ {
+ runner.stop();
+ }
+
+ @Test
+ public void testRun() throws ExecutionException, InterruptedException
+ {
+ Assert.assertEquals(
+ TaskState.SUCCESS,
+ runner.run(new NoopTask(null, null, 500L, 0, null, null, null)).get().getStatusCode()
+ );
+ }
+
+ @Test
+ public void testStop() throws ExecutionException, InterruptedException, TimeoutException
+ {
+ final ListenableFuture future = runner.run(
+ new NoopTask(null, null, Long.MAX_VALUE, 0, null, null, null) // infinite task
+ );
+ runner.stop();
+ Assert.assertEquals(
+ TaskState.FAILED,
+ future.get(1000, TimeUnit.MILLISECONDS).getStatusCode()
+ );
+ }
+
+ @Test
+ public void testStopWithRestorableTask() throws InterruptedException, ExecutionException, TimeoutException
+ {
+ final BooleanHolder holder = new BooleanHolder();
+ final ListenableFuture future = runner.run(
+ new RestorableTask(holder)
+ );
+ runner.stop();
+ Assert.assertEquals(
+ TaskState.SUCCESS,
+ future.get(1000, TimeUnit.MILLISECONDS).getStatusCode()
+ );
+ Assert.assertTrue(holder.get());
+ }
+
+ private static class RestorableTask extends AbstractTask
+ {
+ private final BooleanHolder gracefullyStopped;
+
+ RestorableTask(BooleanHolder gracefullyStopped)
+ {
+ super("testId", "testDataSource", Collections.emptyMap());
+
+ this.gracefullyStopped = gracefullyStopped;
+ }
+
+ @Override
+ public String getType()
+ {
+ return "restorable";
+ }
+
+ @Override
+ public boolean isReady(TaskActionClient taskActionClient)
+ {
+ return true;
+ }
+
+ @Override
+ public TaskStatus run(TaskToolbox toolbox)
+ {
+ return TaskStatus.success(getId());
+ }
+
+ @Override
+ public boolean canRestore()
+ {
+ return true;
+ }
+
+ @Override
+ public void stopGracefully()
+ {
+ gracefullyStopped.set();
+ }
+ }
+
+ private static class BooleanHolder
+ {
+ private boolean value;
+
+ void set()
+ {
+ this.value = true;
+ }
+
+ boolean get()
+ {
+ return value;
+ }
+ }
+}
diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java
index ae847a597b6..8b0aa420998 100644
--- a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java
+++ b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java
@@ -617,7 +617,7 @@ public class TaskLifecycleTest
Preconditions.checkNotNull(taskConfig);
Preconditions.checkNotNull(emitter);
- return new ThreadPoolTaskRunner(
+ return new SingleTaskBackgroundRunner(
tb,
taskConfig,
emitter,
diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/TestTaskRunner.java b/indexing-service/src/test/java/io/druid/indexing/overlord/TestTaskRunner.java
new file mode 100644
index 00000000000..d6b36488766
--- /dev/null
+++ b/indexing-service/src/test/java/io/druid/indexing/overlord/TestTaskRunner.java
@@ -0,0 +1,405 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.indexing.overlord;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import io.druid.concurrent.TaskThreadPriority;
+import io.druid.indexer.TaskLocation;
+import io.druid.indexing.common.TaskStatus;
+import io.druid.indexing.common.TaskToolbox;
+import io.druid.indexing.common.TaskToolboxFactory;
+import io.druid.indexing.common.config.TaskConfig;
+import io.druid.indexing.common.task.Task;
+import io.druid.indexing.overlord.autoscaling.ScalingStats;
+import io.druid.java.util.common.DateTimes;
+import io.druid.java.util.common.ISE;
+import io.druid.java.util.common.Numbers;
+import io.druid.java.util.common.Pair;
+import io.druid.java.util.common.RE;
+import io.druid.java.util.common.concurrent.Execs;
+import io.druid.java.util.emitter.EmittingLogger;
+import io.druid.query.Query;
+import io.druid.query.QueryRunner;
+import io.druid.query.QuerySegmentWalker;
+import io.druid.query.SegmentDescriptor;
+import org.joda.time.Interval;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Runs multiple tasks in a JVM thread using an ExecutorService. This task runner is supposed to be used only for unit
+ * tests.
+ */
+public class TestTaskRunner implements TaskRunner, QuerySegmentWalker
+{
+ private static final EmittingLogger log = new EmittingLogger(TestTaskRunner.class);
+
+ private final ConcurrentMap exec = new ConcurrentHashMap<>();
+ private final Set runningItems = new ConcurrentSkipListSet<>();
+ private final CopyOnWriteArrayList> listeners = new CopyOnWriteArrayList<>();
+
+ private final TaskToolboxFactory toolboxFactory;
+ private final TaskConfig taskConfig;
+ private final TaskLocation taskLocation;
+
+ private volatile boolean stopping = false;
+
+ public TestTaskRunner(
+ TaskToolboxFactory toolboxFactory,
+ TaskConfig taskConfig,
+ TaskLocation taskLocation
+ )
+ {
+ this.toolboxFactory = Preconditions.checkNotNull(toolboxFactory, "toolboxFactory");
+ this.taskConfig = taskConfig;
+ this.taskLocation = taskLocation;
+ }
+
+ @Override
+ public List>> restore()
+ {
+ return ImmutableList.of();
+ }
+
+ @Override
+ public void registerListener(TaskRunnerListener listener, Executor executor)
+ {
+ for (Pair pair : listeners) {
+ if (pair.lhs.getListenerId().equals(listener.getListenerId())) {
+ throw new ISE("Listener [%s] already registered", listener.getListenerId());
+ }
+ }
+
+ final Pair listenerPair = Pair.of(listener, executor);
+
+ // Location never changes for an existing task, so it's ok to add the listener first and then issue bootstrap
+ // callbacks without any special synchronization.
+
+ listeners.add(listenerPair);
+ log.info("Registered listener [%s]", listener.getListenerId());
+ for (TestTaskRunnerWorkItem item : runningItems) {
+ TaskRunnerUtils.notifyLocationChanged(ImmutableList.of(listenerPair), item.getTaskId(), item.getLocation());
+ }
+ }
+
+ @Override
+ public void unregisterListener(String listenerId)
+ {
+ for (Pair pair : listeners) {
+ if (pair.lhs.getListenerId().equals(listenerId)) {
+ listeners.remove(pair);
+ log.info("Unregistered listener [%s]", listenerId);
+ return;
+ }
+ }
+ }
+
+ private static ListeningExecutorService buildExecutorService(int priority)
+ {
+ return MoreExecutors.listeningDecorator(
+ Execs.singleThreaded(
+ "test-task-runner-%d-priority-" + priority,
+ TaskThreadPriority.getThreadPriorityFromTaskPriority(priority)
+ )
+ );
+ }
+
+ @Override
+ public void stop()
+ {
+ stopping = true;
+
+ for (Map.Entry entry : exec.entrySet()) {
+ try {
+ entry.getValue().shutdown();
+ }
+ catch (SecurityException ex) {
+ throw new RuntimeException("I can't control my own threads!", ex);
+ }
+ }
+
+ for (TestTaskRunnerWorkItem item : runningItems) {
+ final Task task = item.getTask();
+ final long start = System.currentTimeMillis();
+
+ if (taskConfig.isRestoreTasksOnRestart() && task.canRestore()) {
+ // Attempt graceful shutdown.
+ log.info("Starting graceful shutdown of task[%s].", task.getId());
+
+ try {
+ task.stopGracefully();
+ final TaskStatus taskStatus = item.getResult().get(
+ new Interval(DateTimes.utc(start), taskConfig.getGracefulShutdownTimeout()).toDurationMillis(),
+ TimeUnit.MILLISECONDS
+ );
+
+ // Ignore status, it doesn't matter for graceful shutdowns.
+ log.info(
+ "Graceful shutdown of task[%s] finished in %,dms.",
+ task.getId(),
+ System.currentTimeMillis() - start
+ );
+
+ TaskRunnerUtils.notifyStatusChanged(listeners, task.getId(), taskStatus);
+ }
+ catch (Exception e) {
+ TaskRunnerUtils.notifyStatusChanged(listeners, task.getId(), TaskStatus.failure(task.getId()));
+ throw new RE(e, "Graceful shutdown of task[%s] aborted with exception", task.getId());
+ }
+ } else {
+ TaskRunnerUtils.notifyStatusChanged(listeners, task.getId(), TaskStatus.failure(task.getId()));
+ }
+ }
+
+ // Ok, now interrupt everything.
+ for (Map.Entry entry : exec.entrySet()) {
+ try {
+ entry.getValue().shutdownNow();
+ }
+ catch (SecurityException ex) {
+ throw new RuntimeException("I can't control my own threads!", ex);
+ }
+ }
+ }
+
+ @Override
+ public ListenableFuture run(final Task task)
+ {
+ final TaskToolbox toolbox = toolboxFactory.build(task);
+ final Object taskPriorityObj = task.getContextValue(TaskThreadPriority.CONTEXT_KEY);
+ int taskPriority = 0;
+ try {
+ taskPriority = taskPriorityObj == null ? 0 : Numbers.parseInt(taskPriorityObj);
+ }
+ catch (NumberFormatException e) {
+ log.error(e, "Error parsing task priority [%s] for task [%s]", taskPriorityObj, task.getId());
+ }
+ final int finalTaskPriority = taskPriority;
+ final ListenableFuture statusFuture = exec
+ .computeIfAbsent(taskPriority, k -> buildExecutorService(finalTaskPriority))
+ .submit(new TestTaskRunnerCallable(task, toolbox));
+ final TestTaskRunnerWorkItem taskRunnerWorkItem = new TestTaskRunnerWorkItem(task, statusFuture);
+ runningItems.add(taskRunnerWorkItem);
+ Futures.addCallback(
+ statusFuture,
+ new FutureCallback()
+ {
+ @Override
+ public void onSuccess(TaskStatus result)
+ {
+ runningItems.remove(taskRunnerWorkItem);
+ }
+
+ @Override
+ public void onFailure(Throwable t)
+ {
+ runningItems.remove(taskRunnerWorkItem);
+ }
+ }
+ );
+
+ return statusFuture;
+ }
+
+ @Override
+ public void shutdown(final String taskid)
+ {
+ for (final TaskRunnerWorkItem runningItem : runningItems) {
+ if (runningItem.getTaskId().equals(taskid)) {
+ runningItem.getResult().cancel(true);
+ }
+ }
+ }
+
+ @Override
+ public Collection getRunningTasks()
+ {
+ return ImmutableList.copyOf(runningItems);
+ }
+
+ @Override
+ public Collection getPendingTasks()
+ {
+ return ImmutableList.of();
+ }
+
+ @Override
+ public Collection getKnownTasks()
+ {
+ return ImmutableList.copyOf(runningItems);
+ }
+
+ @Override
+ public Optional getScalingStats()
+ {
+ return Optional.absent();
+ }
+
+ @Override
+ public void start()
+ {
+ // No state startup required
+ }
+
+ @Override
+ public QueryRunner getQueryRunnerForIntervals(Query query, Iterable intervals)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public QueryRunner getQueryRunnerForSegments(Query query, Iterable specs)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ private static class TestTaskRunnerWorkItem extends TaskRunnerWorkItem
+ implements Comparable
+ {
+ private final Task task;
+
+ private TestTaskRunnerWorkItem(Task task, ListenableFuture result)
+ {
+ super(task.getId(), result);
+ this.task = task;
+ }
+
+ public Task getTask()
+ {
+ return task;
+ }
+
+ @Override
+ public TaskLocation getLocation()
+ {
+ return TaskLocation.create("testHost", 10000, 10000);
+ }
+
+ @Override
+ public String getTaskType()
+ {
+ return task.getType();
+ }
+
+ @Override
+ public String getDataSource()
+ {
+ return task.getDataSource();
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) {
+ return true;
+ }
+
+ if (o instanceof TestTaskRunnerWorkItem) {
+ final TestTaskRunnerWorkItem that = (TestTaskRunnerWorkItem) o;
+ return task.getId().equals(that.task.getId());
+ }
+
+ return false;
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return task.getId().hashCode();
+ }
+
+ @Override
+ public int compareTo(TestTaskRunnerWorkItem o)
+ {
+ return task.getId().compareTo(o.task.getId());
+ }
+ }
+
+ private class TestTaskRunnerCallable implements Callable
+ {
+ private final Task task;
+ private final TaskToolbox toolbox;
+
+ public TestTaskRunnerCallable(Task task, TaskToolbox toolbox)
+ {
+ this.task = task;
+ this.toolbox = toolbox;
+ }
+
+ @Override
+ public TaskStatus call()
+ {
+ final long startTime = System.currentTimeMillis();
+
+ TaskStatus status;
+
+ try {
+ log.info("Running task: %s", task.getId());
+ TaskRunnerUtils.notifyLocationChanged(
+ listeners,
+ task.getId(),
+ taskLocation
+ );
+ TaskRunnerUtils.notifyStatusChanged(listeners, task.getId(), TaskStatus.running(task.getId()));
+ status = task.run(toolbox);
+ }
+ catch (InterruptedException e) {
+ // Don't reset the interrupt flag of the thread, as we do want to continue to the end of this callable.
+ if (stopping) {
+ // Tasks may interrupt their own run threads to stop themselves gracefully; don't be too scary about this.
+ log.debug(e, "Interrupted while running task[%s] during graceful shutdown.", task);
+ } else {
+ // Not stopping, this is definitely unexpected.
+ log.warn(e, "Interrupted while running task[%s]", task);
+ }
+
+ status = TaskStatus.failure(task.getId());
+ }
+ catch (Exception e) {
+ log.error(e, "Exception while running task[%s]", task);
+ status = TaskStatus.failure(task.getId());
+ }
+ catch (Throwable t) {
+ throw new RE(t, "Uncaught Throwable while running task[%s]", task);
+ }
+
+ status = status.withDuration(System.currentTimeMillis() - startTime);
+ TaskRunnerUtils.notifyStatusChanged(listeners, task.getId(), status);
+ return status;
+ }
+ }
+}
diff --git a/indexing-service/src/test/java/io/druid/indexing/worker/WorkerTaskManagerTest.java b/indexing-service/src/test/java/io/druid/indexing/worker/WorkerTaskManagerTest.java
index 9be677ae07b..53d71b662c6 100644
--- a/indexing-service/src/test/java/io/druid/indexing/worker/WorkerTaskManagerTest.java
+++ b/indexing-service/src/test/java/io/druid/indexing/worker/WorkerTaskManagerTest.java
@@ -22,7 +22,6 @@ package io.druid.indexing.worker;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Lists;
import com.google.common.io.Files;
import io.druid.discovery.DruidLeaderClient;
import io.druid.indexer.TaskLocation;
@@ -38,18 +37,15 @@ import io.druid.indexing.common.task.NoopTask;
import io.druid.indexing.common.task.NoopTestTaskFileWriter;
import io.druid.indexing.common.task.Task;
import io.druid.indexing.common.task.Tasks;
-import io.druid.indexing.overlord.ThreadPoolTaskRunner;
+import io.druid.indexing.overlord.TestTaskRunner;
import io.druid.segment.IndexIO;
import io.druid.segment.IndexMergerV9;
import io.druid.segment.loading.SegmentLoaderConfig;
import io.druid.segment.loading.SegmentLoaderLocalCacheManager;
import io.druid.segment.loading.StorageLocationConfig;
import io.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory;
-import io.druid.server.DruidNode;
import io.druid.server.coordination.ChangeRequestHistory;
import io.druid.server.coordination.ChangeRequestsSnapshot;
-import io.druid.server.initialization.ServerConfig;
-import io.druid.server.metrics.NoopServiceEmitter;
import org.easymock.EasyMock;
import org.junit.After;
import org.junit.Assert;
@@ -57,14 +53,14 @@ import org.junit.Before;
import org.junit.Test;
import java.io.File;
+import java.util.Collections;
import java.util.List;
/**
*/
public class WorkerTaskManagerTest
{
- private static final DruidNode DUMMY_NODE = new DruidNode("dummy", "dummy", 9000, null, true, false);
-
+ private final TaskLocation location = TaskLocation.create("localhost", 1, 2);
private final ObjectMapper jsonMapper;
private final IndexMergerV9 indexMergerV9;
private final IndexIO indexIO;
@@ -98,26 +94,33 @@ public class WorkerTaskManagerTest
SegmentHandoffNotifierFactory notifierFactory = EasyMock.createNiceMock(SegmentHandoffNotifierFactory.class);
EasyMock.replay(taskActionClientFactory, taskActionClient, notifierFactory);
+ final SegmentLoaderConfig loaderConfig = new SegmentLoaderConfig()
+ {
+ @Override
+ public List getLocations()
+ {
+ return Collections.emptyList();
+ }
+ };
+
return new WorkerTaskManager(
jsonMapper,
- new ThreadPoolTaskRunner(
+ new TestTaskRunner(
new TaskToolboxFactory(
taskConfig,
taskActionClientFactory,
- null, null, null, null, null, null, null, notifierFactory, null, null, null, new SegmentLoaderFactory(
- new SegmentLoaderLocalCacheManager(
- null,
- new SegmentLoaderConfig()
- {
- @Override
- public List getLocations()
- {
- return Lists.newArrayList();
- }
- },
- jsonMapper
- )
- ),
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ notifierFactory,
+ null,
+ null,
+ null,
+ new SegmentLoaderFactory(new SegmentLoaderLocalCacheManager(null, loaderConfig, jsonMapper)),
jsonMapper,
indexIO,
null,
@@ -130,9 +133,7 @@ public class WorkerTaskManagerTest
new NoopTestTaskFileWriter()
),
taskConfig,
- new NoopServiceEmitter(),
- DUMMY_NODE,
- new ServerConfig()
+ location
),
taskConfig,
EasyMock.createNiceMock(DruidLeaderClient.class)
@@ -181,7 +182,7 @@ public class WorkerTaskManagerTest
TaskAnnouncement.create(
task2,
TaskStatus.success(task2.getId()),
- TaskLocation.create("localhost", 1, 2)
+ location
)
);
@@ -196,11 +197,9 @@ public class WorkerTaskManagerTest
Assert.assertTrue(new File(workerTaskManager.getCompletedTaskDir(), task1.getId()).exists());
Assert.assertFalse(new File(workerTaskManager.getAssignedTaskDir(), task1.getId()).exists());
- ChangeRequestsSnapshot baseHistory = workerTaskManager.getChangesSince(
- new ChangeRequestHistory.Counter(
- -1,
- 0
- )).get();
+ ChangeRequestsSnapshot baseHistory = workerTaskManager
+ .getChangesSince(new ChangeRequestHistory.Counter(-1, 0))
+ .get();
Assert.assertFalse(baseHistory.isResetCounter());
Assert.assertEquals(3, baseHistory.getRequests().size());
diff --git a/indexing-service/src/test/java/io/druid/indexing/worker/WorkerTaskMonitorTest.java b/indexing-service/src/test/java/io/druid/indexing/worker/WorkerTaskMonitorTest.java
index 51d27f0b2a4..0c8781aa995 100644
--- a/indexing-service/src/test/java/io/druid/indexing/worker/WorkerTaskMonitorTest.java
+++ b/indexing-service/src/test/java/io/druid/indexing/worker/WorkerTaskMonitorTest.java
@@ -39,7 +39,7 @@ import io.druid.indexing.common.config.TaskConfig;
import io.druid.indexing.common.task.NoopTestTaskFileWriter;
import io.druid.indexing.common.task.Task;
import io.druid.indexing.overlord.TestRemoteTaskRunnerConfig;
-import io.druid.indexing.overlord.ThreadPoolTaskRunner;
+import io.druid.indexing.overlord.SingleTaskBackgroundRunner;
import io.druid.java.util.common.StringUtils;
import io.druid.segment.IndexIO;
import io.druid.segment.IndexMergerV9;
@@ -165,7 +165,7 @@ public class WorkerTaskMonitorTest
EasyMock.replay(taskActionClientFactory, taskActionClient, notifierFactory);
return new WorkerTaskMonitor(
jsonMapper,
- new ThreadPoolTaskRunner(
+ new SingleTaskBackgroundRunner(
new TaskToolboxFactory(
taskConfig,
taskActionClientFactory,
diff --git a/java-util/src/main/java/io/druid/java/util/common/Numbers.java b/java-util/src/main/java/io/druid/java/util/common/Numbers.java
new file mode 100644
index 00000000000..02ff20bbc72
--- /dev/null
+++ b/java-util/src/main/java/io/druid/java/util/common/Numbers.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.java.util.common;
+
+public final class Numbers
+{
+ /**
+ * Parse the given object as a {@code long}. The input object can be a {@link String} or one of the implementations of
+ * {@link Number}. You may want to use {@code GuavaUtils.tryParseLong()} instead if the input is a nullable string and
+ * you want to avoid any exceptions.
+ *
+ * @throws NumberFormatException if the input is an unparseable string.
+ * @throws NullPointerException if the input is null.
+ * @throws ISE if the input is not a string or a number.
+ */
+ public static long parseLong(Object val)
+ {
+ if (val instanceof String) {
+ return Long.parseLong((String) val);
+ } else if (val instanceof Number) {
+ return ((Number) val).longValue();
+ } else {
+ if (val == null) {
+ throw new NullPointerException("Input is null");
+ } else {
+ throw new ISE("Unknown type [%s]", val.getClass());
+ }
+ }
+ }
+
+ /**
+ * Parse the given object as a {@code int}. The input object can be a {@link String} or one of the implementations of
+ * {@link Number}.
+ *
+ * @throws NumberFormatException if the input is an unparseable string.
+ * @throws NullPointerException if the input is null.
+ * @throws ISE if the input is not a string or a number.
+ */
+ public static int parseInt(Object val)
+ {
+ if (val instanceof String) {
+ return Integer.parseInt((String) val);
+ } else if (val instanceof Number) {
+ return ((Number) val).intValue();
+ } else {
+ if (val == null) {
+ throw new NullPointerException("Input is null");
+ } else {
+ throw new ISE("Unknown type [%s]", val.getClass());
+ }
+ }
+ }
+
+ /**
+ * Parse the given object as a {@code boolean}. The input object can be a {@link String} or {@link Boolean}.
+ *
+ * @return {@code true} only if the input is a {@link Boolean} representing {@code true} or a {@link String} of
+ * {@code "true"}.
+ *
+ * @throws NullPointerException if the input is null.
+ * @throws ISE if the input is not a string or a number.
+ */
+ public static boolean parseBoolean(Object val)
+ {
+ if (val instanceof String) {
+ return Boolean.parseBoolean((String) val);
+ } else if (val instanceof Boolean) {
+ return (boolean) val;
+ } else {
+ if (val == null) {
+ throw new NullPointerException("Input is null");
+ } else {
+ throw new ISE("Unknown type [%s]", val.getClass());
+ }
+ }
+ }
+
+ private Numbers()
+ {
+ }
+}
diff --git a/java-util/src/test/java/io/druid/java/util/common/NumbersTest.java b/java-util/src/test/java/io/druid/java/util/common/NumbersTest.java
new file mode 100644
index 00000000000..04735f585ea
--- /dev/null
+++ b/java-util/src/test/java/io/druid/java/util/common/NumbersTest.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.java.util.common;
+
+import org.hamcrest.CoreMatchers;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+public class NumbersTest
+{
+ @Rule
+ public ExpectedException expectedException = ExpectedException.none();
+
+ @Test
+ public void testParseLong()
+ {
+ final String strVal = "100";
+ Assert.assertEquals(100L, Numbers.parseLong(strVal));
+
+ final Long longVal = 100L;
+ Assert.assertEquals(100L, Numbers.parseLong(longVal));
+
+ final Double doubleVal = 100.;
+ Assert.assertEquals(100L, Numbers.parseLong(doubleVal));
+ }
+
+ @Test
+ public void testParseLongWithNull()
+ {
+ expectedException.expect(NullPointerException.class);
+ expectedException.expectMessage("Input is null");
+ Numbers.parseLong(null);
+ }
+
+ @Test
+ public void testParseLongWithUnparseableString()
+ {
+ expectedException.expect(NumberFormatException.class);
+ Numbers.parseLong("unparseable");
+ }
+
+ @Test
+ public void testParseLongWithUnparseableObject()
+ {
+ expectedException.expect(ISE.class);
+ expectedException.expectMessage(CoreMatchers.startsWith("Unknown type"));
+ Numbers.parseLong(new Object());
+ }
+
+ @Test
+ public void testParseInt()
+ {
+ final String strVal = "100";
+ Assert.assertEquals(100, Numbers.parseInt(strVal));
+
+ final Integer longVal = 100;
+ Assert.assertEquals(100, Numbers.parseInt(longVal));
+
+ final Float floatVal = 100.F;
+ Assert.assertEquals(100, Numbers.parseInt(floatVal));
+ }
+
+ @Test
+ public void testParseIntWithNull()
+ {
+ expectedException.expect(NullPointerException.class);
+ expectedException.expectMessage("Input is null");
+ Numbers.parseInt(null);
+ }
+
+ @Test
+ public void testParseIntWithUnparseableString()
+ {
+ expectedException.expect(NumberFormatException.class);
+ Numbers.parseInt("unparseable");
+ }
+
+ @Test
+ public void testParseIntWithUnparseableObject()
+ {
+ expectedException.expect(ISE.class);
+ expectedException.expectMessage(CoreMatchers.startsWith("Unknown type"));
+ Numbers.parseInt(new Object());
+ }
+
+ @Test
+ public void testParseBoolean()
+ {
+ final String strVal = "false";
+ Assert.assertEquals(false, Numbers.parseBoolean(strVal));
+
+ final Boolean booleanVal = Boolean.FALSE;
+ Assert.assertEquals(false, Numbers.parseBoolean(booleanVal));
+ }
+
+ @Test
+ public void testParseBooleanWithNull()
+ {
+ expectedException.expect(NullPointerException.class);
+ expectedException.expectMessage("Input is null");
+ Numbers.parseBoolean(null);
+ }
+
+ @Test
+ public void testParseBooleanWithUnparseableObject()
+ {
+ expectedException.expect(ISE.class);
+ expectedException.expectMessage(CoreMatchers.startsWith("Unknown type"));
+ Numbers.parseBoolean(new Object());
+ }
+}
diff --git a/processing/src/main/java/io/druid/query/QueryContexts.java b/processing/src/main/java/io/druid/query/QueryContexts.java
index d88a536eb9e..6a8ae0d5d8f 100644
--- a/processing/src/main/java/io/druid/query/QueryContexts.java
+++ b/processing/src/main/java/io/druid/query/QueryContexts.java
@@ -23,7 +23,7 @@ import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import io.druid.guice.annotations.PublicApi;
import io.druid.java.util.common.IAE;
-import io.druid.java.util.common.ISE;
+import io.druid.java.util.common.Numbers;
@PublicApi
public class QueryContexts
@@ -212,46 +212,23 @@ public class QueryContexts
static long parseLong(Query query, String key, long defaultValue)
{
- Object val = query.getContextValue(key);
- if (val == null) {
- return defaultValue;
- }
- if (val instanceof String) {
- return Long.parseLong((String) val);
- } else if (val instanceof Number) {
- return ((Number) val).longValue();
- } else {
- throw new ISE("Unknown type [%s]", val.getClass());
- }
+ final Object val = query.getContextValue(key);
+ return val == null ? defaultValue : Numbers.parseLong(val);
}
static int parseInt(Query query, String key, int defaultValue)
{
- Object val = query.getContextValue(key);
- if (val == null) {
- return defaultValue;
- }
- if (val instanceof String) {
- return Integer.parseInt((String) val);
- } else if (val instanceof Number) {
- return ((Number) val).intValue();
- } else {
- throw new ISE("Unknown type [%s]", val.getClass());
- }
+ final Object val = query.getContextValue(key);
+ return val == null ? defaultValue : Numbers.parseInt(val);
}
static boolean parseBoolean(Query query, String key, boolean defaultValue)
{
- Object val = query.getContextValue(key);
- if (val == null) {
- return defaultValue;
- }
- if (val instanceof String) {
- return Boolean.parseBoolean((String) val);
- } else if (val instanceof Boolean) {
- return (boolean) val;
- } else {
- throw new ISE("Unknown type [%s]. Cannot parse!", val.getClass());
- }
+ final Object val = query.getContextValue(key);
+ return val == null ? defaultValue : Numbers.parseBoolean(val);
+ }
+
+ private QueryContexts()
+ {
}
}
diff --git a/server/src/main/java/io/druid/server/ClientQuerySegmentWalker.java b/server/src/main/java/io/druid/server/ClientQuerySegmentWalker.java
index e9ee6959ea3..9540e897bb1 100644
--- a/server/src/main/java/io/druid/server/ClientQuerySegmentWalker.java
+++ b/server/src/main/java/io/druid/server/ClientQuerySegmentWalker.java
@@ -116,7 +116,7 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker
return new FluentQueryRunnerBuilder<>(toolChest)
.create(
- new SetAndVerifyContextQueryRunner(
+ new SetAndVerifyContextQueryRunner<>(
serverConfig,
new RetryQueryRunner<>(
baseClientRunner,
diff --git a/server/src/main/java/io/druid/server/SetAndVerifyContextQueryRunner.java b/server/src/main/java/io/druid/server/SetAndVerifyContextQueryRunner.java
index 637b9dd14fb..9d5a355aaa6 100644
--- a/server/src/main/java/io/druid/server/SetAndVerifyContextQueryRunner.java
+++ b/server/src/main/java/io/druid/server/SetAndVerifyContextQueryRunner.java
@@ -31,38 +31,32 @@ import java.util.Map;
/**
* Use this QueryRunner to set and verify Query contexts.
*/
-public class SetAndVerifyContextQueryRunner implements QueryRunner
+public class SetAndVerifyContextQueryRunner implements QueryRunner
{
private final ServerConfig serverConfig;
- private final QueryRunner baseRunner;
+ private final QueryRunner baseRunner;
- public SetAndVerifyContextQueryRunner(ServerConfig serverConfig, QueryRunner baseRunner)
+ public SetAndVerifyContextQueryRunner(ServerConfig serverConfig, QueryRunner baseRunner)
{
this.serverConfig = serverConfig;
this.baseRunner = baseRunner;
}
@Override
- public Sequence run(QueryPlus queryPlus, Map responseContext)
+ public Sequence run(QueryPlus queryPlus, Map responseContext)
{
return baseRunner.run(
- QueryPlus.wrap((Query) withTimeoutAndMaxScatterGatherBytes(
- queryPlus.getQuery(),
- serverConfig
- )),
+ QueryPlus.wrap(withTimeoutAndMaxScatterGatherBytes(queryPlus.getQuery(), serverConfig)),
responseContext
);
}
- public static > QueryType withTimeoutAndMaxScatterGatherBytes(
- final QueryType query,
- ServerConfig serverConfig
- )
+ public Query withTimeoutAndMaxScatterGatherBytes(Query query, ServerConfig serverConfig)
{
- return (QueryType) QueryContexts.verifyMaxQueryTimeout(
+ return QueryContexts.verifyMaxQueryTimeout(
QueryContexts.withMaxScatterGatherBytes(
QueryContexts.withDefaultTimeout(
- (Query) query,
+ query,
Math.min(serverConfig.getDefaultQueryTimeout(), serverConfig.getMaxQueryTimeout())
),
serverConfig.getMaxScatterGatherBytes()
diff --git a/server/src/main/java/io/druid/server/coordination/NoopDataSegmentAnnouncer.java b/server/src/main/java/io/druid/server/coordination/NoopDataSegmentAnnouncer.java
new file mode 100644
index 00000000000..e232eed4bd8
--- /dev/null
+++ b/server/src/main/java/io/druid/server/coordination/NoopDataSegmentAnnouncer.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.server.coordination;
+
+import io.druid.timeline.DataSegment;
+
+/**
+ * Mostly used for test purpose.
+ */
+public class NoopDataSegmentAnnouncer implements DataSegmentAnnouncer
+{
+ @Override
+ public void announceSegment(DataSegment segment)
+ {
+ }
+
+ @Override
+ public void unannounceSegment(DataSegment segment)
+ {
+ }
+
+ @Override
+ public void announceSegments(Iterable segments)
+ {
+ }
+
+ @Override
+ public void unannounceSegments(Iterable segments)
+ {
+ }
+}
diff --git a/server/src/main/java/io/druid/server/coordination/ServerManager.java b/server/src/main/java/io/druid/server/coordination/ServerManager.java
index 7cee07003cb..52fdf36cf90 100644
--- a/server/src/main/java/io/druid/server/coordination/ServerManager.java
+++ b/server/src/main/java/io/druid/server/coordination/ServerManager.java
@@ -23,8 +23,6 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.google.common.collect.Iterables;
import com.google.inject.Inject;
-import io.druid.java.util.emitter.EmittingLogger;
-import io.druid.java.util.emitter.service.ServiceEmitter;
import io.druid.client.CachingQueryRunner;
import io.druid.client.cache.Cache;
import io.druid.client.cache.CacheConfig;
@@ -33,6 +31,8 @@ import io.druid.guice.annotations.Processing;
import io.druid.guice.annotations.Smile;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.guava.FunctionalIterable;
+import io.druid.java.util.emitter.EmittingLogger;
+import io.druid.java.util.emitter.service.ServiceEmitter;
import io.druid.query.BySegmentQueryRunner;
import io.druid.query.CPUTimeMetricQueryRunner;
import io.druid.query.DataSource;
@@ -280,26 +280,26 @@ public class ServerManager implements QuerySegmentWalker
{
SpecificSegmentSpec segmentSpec = new SpecificSegmentSpec(segmentDescriptor);
String segmentId = adapter.getIdentifier();
- return new SetAndVerifyContextQueryRunner(
+ return new SetAndVerifyContextQueryRunner<>(
serverConfig,
CPUTimeMetricQueryRunner.safeBuild(
- new SpecificSegmentQueryRunner(
- new MetricsEmittingQueryRunner(
+ new SpecificSegmentQueryRunner<>(
+ new MetricsEmittingQueryRunner<>(
emitter,
toolChest,
- new BySegmentQueryRunner(
+ new BySegmentQueryRunner<>(
segmentId,
adapter.getDataInterval().getStart(),
- new CachingQueryRunner(
+ new CachingQueryRunner<>(
segmentId,
segmentDescriptor,
objectMapper,
cache,
toolChest,
- new MetricsEmittingQueryRunner(
+ new MetricsEmittingQueryRunner<>(
emitter,
toolChest,
- new ReferenceCountingSegmentQueryRunner(factory, adapter, segmentDescriptor),
+ new ReferenceCountingSegmentQueryRunner<>(factory, adapter, segmentDescriptor),
QueryMetrics::reportSegmentTime,
queryMetrics -> queryMetrics.segment(segmentId)
),
diff --git a/services/src/main/java/io/druid/cli/CliPeon.java b/services/src/main/java/io/druid/cli/CliPeon.java
index d7da80bae62..aa6f0b13583 100644
--- a/services/src/main/java/io/druid/cli/CliPeon.java
+++ b/services/src/main/java/io/druid/cli/CliPeon.java
@@ -67,7 +67,7 @@ import io.druid.indexing.overlord.HeapMemoryTaskStorage;
import io.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
import io.druid.indexing.overlord.TaskRunner;
import io.druid.indexing.overlord.TaskStorage;
-import io.druid.indexing.overlord.ThreadPoolTaskRunner;
+import io.druid.indexing.overlord.SingleTaskBackgroundRunner;
import io.druid.indexing.worker.executor.ExecutorLifecycle;
import io.druid.indexing.worker.executor.ExecutorLifecycleConfig;
import io.druid.java.util.common.lifecycle.Lifecycle;
@@ -207,9 +207,9 @@ public class CliPeon extends GuiceRunnable
)
);
- binder.bind(TaskRunner.class).to(ThreadPoolTaskRunner.class);
- binder.bind(QuerySegmentWalker.class).to(ThreadPoolTaskRunner.class);
- binder.bind(ThreadPoolTaskRunner.class).in(ManageLifecycle.class);
+ binder.bind(TaskRunner.class).to(SingleTaskBackgroundRunner.class);
+ binder.bind(QuerySegmentWalker.class).to(SingleTaskBackgroundRunner.class);
+ binder.bind(SingleTaskBackgroundRunner.class).in(ManageLifecycle.class);
JsonConfigProvider.bind(binder, "druid.realtime.cache", CacheConfig.class);
binder.install(new CacheModule());
diff --git a/services/src/main/java/io/druid/cli/CliRealtimeExample.java b/services/src/main/java/io/druid/cli/CliRealtimeExample.java
index f7c7cbc8285..cdc8301244f 100644
--- a/services/src/main/java/io/druid/cli/CliRealtimeExample.java
+++ b/services/src/main/java/io/druid/cli/CliRealtimeExample.java
@@ -20,7 +20,6 @@
package io.druid.cli;
import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
import com.google.inject.Binder;
import com.google.inject.Inject;
import com.google.inject.Module;
@@ -37,15 +36,14 @@ import io.druid.guice.RealtimeModule;
import io.druid.java.util.common.logger.Logger;
import io.druid.query.lookup.LookupModule;
import io.druid.segment.loading.DataSegmentPusher;
+import io.druid.segment.loading.NoopDataSegmentPusher;
import io.druid.server.coordination.DataSegmentAnnouncer;
+import io.druid.server.coordination.NoopDataSegmentAnnouncer;
import io.druid.server.initialization.jetty.ChatHandlerServerModule;
import io.druid.timeline.DataSegment;
-import java.io.File;
-import java.net.URI;
import java.util.Collection;
import java.util.List;
-import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Executor;
@@ -136,60 +134,4 @@ public class CliRealtimeExample extends ServerRunnable
return false;
}
}
-
- private static class NoopDataSegmentPusher implements DataSegmentPusher
- {
-
- @Override
- public String getPathForHadoop()
- {
- return "noop";
- }
-
- @Deprecated
- @Override
- public String getPathForHadoop(String dataSource)
- {
- return getPathForHadoop();
- }
-
- @Override
- public DataSegment push(File file, DataSegment segment, boolean useUniquePath)
- {
- return segment;
- }
-
- @Override
- public Map makeLoadSpec(URI uri)
- {
- return ImmutableMap.of();
- }
- }
-
- private static class NoopDataSegmentAnnouncer implements DataSegmentAnnouncer
- {
- @Override
- public void announceSegment(DataSegment segment)
- {
- // do nothing
- }
-
- @Override
- public void unannounceSegment(DataSegment segment)
- {
- // do nothing
- }
-
- @Override
- public void announceSegments(Iterable segments)
- {
- // do nothing
- }
-
- @Override
- public void unannounceSegments(Iterable segments)
- {
- // do nothing
- }
- }
}