Simple cleanup for ThreadPoolTaskRunner and SetAndVerifyContextQueryRunner / Add ThreadPoolTaskRunnerTest (#5557)

* Simple fix for ThreadPoolTaskRunner

* fix build

* address comments

* update javadoc

* fix build

* fix test

* add dependency
This commit is contained in:
Jihoon Son 2018-05-15 10:23:11 -07:00 committed by Nishant Bangarwa
parent c73e3ea4f5
commit 9dca5ec76b
22 changed files with 1248 additions and 279 deletions

View File

@ -140,6 +140,17 @@
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-release-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>test-jar</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>

View File

@ -0,0 +1,58 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.segment.loading;
import com.google.common.collect.ImmutableMap;
import io.druid.timeline.DataSegment;
import java.io.File;
import java.net.URI;
import java.util.Map;
/**
* Mostly used for test purpose.
*/
public class NoopDataSegmentPusher implements DataSegmentPusher
{
@Override
public String getPathForHadoop()
{
return "noop";
}
@Deprecated
@Override
public String getPathForHadoop(String dataSource)
{
return getPathForHadoop();
}
@Override
public DataSegment push(File file, DataSegment segment, boolean replaceExisting)
{
return segment;
}
@Override
public Map<String, Object> makeLoadSpec(URI uri)
{
return ImmutableMap.of();
}
}

View File

@ -0,0 +1,44 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.segment.loading;
import io.druid.timeline.DataSegment;
import javax.annotation.Nullable;
/**
* Mostly used for test purpose.
*/
public class NoopDataSegmentArchiver implements DataSegmentArchiver
{
@Nullable
@Override
public DataSegment archive(DataSegment segment) throws SegmentLoadingException
{
return segment;
}
@Nullable
@Override
public DataSegment restore(DataSegment segment) throws SegmentLoadingException
{
return segment;
}
}

View File

@ -0,0 +1,40 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.segment.loading;
import io.druid.timeline.DataSegment;
import java.io.IOException;
/**
* Mostly used for test purpose.
*/
public class NoopDataSegmentKiller implements DataSegmentKiller
{
@Override
public void kill(DataSegment segments) throws SegmentLoadingException
{
}
@Override
public void killAll() throws IOException
{
}
}

View File

@ -0,0 +1,38 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.segment.loading;
import io.druid.timeline.DataSegment;
import java.util.Map;
/**
* Mostly used for test purpose.
*/
public class NoopDataSegmentMover implements DataSegmentMover
{
@Override
public DataSegment move(
DataSegment segment, Map<String, Object> targetLoadSpec
)
{
return segment;
}
}

View File

@ -81,6 +81,13 @@
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.druid</groupId>
<artifactId>druid-api</artifactId>
<version>${project.parent.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
</dependencies>
<build>

View File

@ -23,16 +23,10 @@ import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.inject.Inject;
import io.druid.java.util.emitter.EmittingLogger;
import io.druid.java.util.emitter.service.ServiceEmitter;
import io.druid.java.util.emitter.service.ServiceMetricEvent;
import io.druid.java.util.common.concurrent.Execs;
import io.druid.concurrent.TaskThreadPriority;
import io.druid.guice.annotations.Self;
import io.druid.indexer.TaskLocation;
@ -44,8 +38,14 @@ import io.druid.indexing.common.task.Task;
import io.druid.indexing.overlord.autoscaling.ScalingStats;
import io.druid.java.util.common.DateTimes;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.Numbers;
import io.druid.java.util.common.Pair;
import io.druid.java.util.common.concurrent.Execs;
import io.druid.java.util.common.lifecycle.LifecycleStart;
import io.druid.java.util.common.lifecycle.LifecycleStop;
import io.druid.java.util.emitter.EmittingLogger;
import io.druid.java.util.emitter.service.ServiceEmitter;
import io.druid.java.util.emitter.service.ServiceMetricEvent;
import io.druid.query.NoopQueryRunner;
import io.druid.query.Query;
import io.druid.query.QueryRunner;
@ -57,40 +57,35 @@ import io.druid.server.initialization.ServerConfig;
import org.joda.time.Interval;
import java.util.Collection;
import java.util.Comparator;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
/**
* Runs tasks in a JVM thread using an ExecutorService.
* Runs a single task in a JVM thread using an ExecutorService.
*/
public class ThreadPoolTaskRunner implements TaskRunner, QuerySegmentWalker
public class SingleTaskBackgroundRunner implements TaskRunner, QuerySegmentWalker
{
private static final EmittingLogger log = new EmittingLogger(ThreadPoolTaskRunner.class);
private static final EmittingLogger log = new EmittingLogger(SingleTaskBackgroundRunner.class);
private final TaskToolboxFactory toolboxFactory;
private final TaskConfig taskConfig;
private final ConcurrentMap<Integer, ListeningExecutorService> exec = new ConcurrentHashMap<>();
private final Set<ThreadPoolTaskRunnerWorkItem> runningItems = new ConcurrentSkipListSet<>(
ThreadPoolTaskRunnerWorkItem.COMPARATOR
);
private final CopyOnWriteArrayList<Pair<TaskRunnerListener, Executor>> listeners = new CopyOnWriteArrayList<>();
private final ServiceEmitter emitter;
private final TaskLocation location;
private final ServerConfig serverConfig;
private volatile boolean stopping = false;
// Currently any listeners are registered in peons, but they might be used in the future.
private final CopyOnWriteArrayList<Pair<TaskRunnerListener, Executor>> listeners = new CopyOnWriteArrayList<>();
private volatile ListeningExecutorService executorService;
private volatile SingleTaskBackgroundRunnerWorkItem runningItem;
private volatile boolean stopping;
@Inject
public ThreadPoolTaskRunner(
public SingleTaskBackgroundRunner(
TaskToolboxFactory toolboxFactory,
TaskConfig taskConfig,
ServiceEmitter emitter,
@ -108,7 +103,7 @@ public class ThreadPoolTaskRunner implements TaskRunner, QuerySegmentWalker
@Override
public List<Pair<Task, ListenableFuture<TaskStatus>>> restore()
{
return ImmutableList.of();
return Collections.emptyList();
}
@Override
@ -127,8 +122,12 @@ public class ThreadPoolTaskRunner implements TaskRunner, QuerySegmentWalker
listeners.add(listenerPair);
log.info("Registered listener [%s]", listener.getListenerId());
for (ThreadPoolTaskRunnerWorkItem item : runningItems) {
TaskRunnerUtils.notifyLocationChanged(ImmutableList.of(listenerPair), item.getTaskId(), item.getLocation());
if (runningItem != null) {
TaskRunnerUtils.notifyLocationChanged(
ImmutableList.of(listenerPair),
runningItem.getTaskId(),
runningItem.getLocation()
);
}
}
@ -154,23 +153,30 @@ public class ThreadPoolTaskRunner implements TaskRunner, QuerySegmentWalker
);
}
@Override
@LifecycleStart
public void start()
{
// No state startup required
}
@Override
@LifecycleStop
public void stop()
{
stopping = true;
for (Map.Entry<Integer, ListeningExecutorService> entry : exec.entrySet()) {
if (executorService != null) {
try {
entry.getValue().shutdown();
executorService.shutdown();
}
catch (SecurityException ex) {
log.wtf(ex, "I can't control my own threads!");
}
}
for (ThreadPoolTaskRunnerWorkItem item : runningItems) {
final Task task = item.getTask();
if (runningItem != null) {
final Task task = runningItem.getTask();
final long start = System.currentTimeMillis();
final boolean graceful;
final long elapsed;
@ -183,7 +189,7 @@ public class ThreadPoolTaskRunner implements TaskRunner, QuerySegmentWalker
try {
task.stopGracefully();
final TaskStatus taskStatus = item.getResult().get(
final TaskStatus taskStatus = runningItem.getResult().get(
new Interval(DateTimes.utc(start), taskConfig.getGracefulShutdownTimeout()).toDurationMillis(),
TimeUnit.MILLISECONDS
);
@ -225,9 +231,9 @@ public class ThreadPoolTaskRunner implements TaskRunner, QuerySegmentWalker
}
// Ok, now interrupt everything.
for (Map.Entry<Integer, ListeningExecutorService> entry : exec.entrySet()) {
if (executorService != null) {
try {
entry.getValue().shutdownNow();
executorService.shutdownNow();
}
catch (SecurityException ex) {
log.wtf(ex, "I can't control my own threads!");
@ -238,87 +244,63 @@ public class ThreadPoolTaskRunner implements TaskRunner, QuerySegmentWalker
@Override
public ListenableFuture<TaskStatus> run(final Task task)
{
final TaskToolbox toolbox = toolboxFactory.build(task);
final Object taskPriorityObj = task.getContextValue(TaskThreadPriority.CONTEXT_KEY);
int taskPriority = 0;
if (taskPriorityObj != null) {
if (taskPriorityObj instanceof Number) {
taskPriority = ((Number) taskPriorityObj).intValue();
} else if (taskPriorityObj instanceof String) {
try {
taskPriority = Integer.parseInt(taskPriorityObj.toString());
}
catch (NumberFormatException e) {
log.error(e, "Error parsing task priority [%s] for task [%s]", taskPriorityObj, task.getId());
}
if (runningItem == null) {
final TaskToolbox toolbox = toolboxFactory.build(task);
final Object taskPriorityObj = task.getContextValue(TaskThreadPriority.CONTEXT_KEY);
int taskPriority = 0;
try {
taskPriority = taskPriorityObj == null ? 0 : Numbers.parseInt(taskPriorityObj);
}
}
// Ensure an executor for that priority exists
if (!exec.containsKey(taskPriority)) {
final ListeningExecutorService executorService = buildExecutorService(taskPriority);
if (exec.putIfAbsent(taskPriority, executorService) != null) {
// favor prior service
executorService.shutdownNow();
catch (NumberFormatException e) {
log.error(e, "Error parsing task priority [%s] for task [%s]", taskPriorityObj, task.getId());
}
// Ensure an executor for that priority exists
executorService = buildExecutorService(taskPriority);
final ListenableFuture<TaskStatus> statusFuture = executorService.submit(
new SingleTaskBackgroundRunnerCallable(task, location, toolbox)
);
runningItem = new SingleTaskBackgroundRunnerWorkItem(
task,
location,
statusFuture
);
return statusFuture;
} else {
throw new ISE("Already running task[%s]", runningItem.getTask().getId());
}
final ListenableFuture<TaskStatus> statusFuture = exec.get(taskPriority)
.submit(new ThreadPoolTaskRunnerCallable(
task,
location,
toolbox
));
final ThreadPoolTaskRunnerWorkItem taskRunnerWorkItem = new ThreadPoolTaskRunnerWorkItem(
task,
location,
statusFuture
);
runningItems.add(taskRunnerWorkItem);
Futures.addCallback(
statusFuture, new FutureCallback<TaskStatus>()
{
@Override
public void onSuccess(TaskStatus result)
{
runningItems.remove(taskRunnerWorkItem);
}
@Override
public void onFailure(Throwable t)
{
runningItems.remove(taskRunnerWorkItem);
}
}
);
return statusFuture;
}
/**
* There might be a race between {@link #run(Task)} and this method, but it shouldn't happen in real applications
* because this method is called only in unit tests. See TaskLifecycleTest.
*
* @param taskid task ID to clean up resources for
*/
@Override
public void shutdown(final String taskid)
{
for (final TaskRunnerWorkItem runningItem : runningItems) {
if (runningItem.getTaskId().equals(taskid)) {
runningItem.getResult().cancel(true);
}
if (runningItem != null && runningItem.getTask().getId().equals(taskid)) {
runningItem.getResult().cancel(true);
}
}
@Override
public Collection<TaskRunnerWorkItem> getRunningTasks()
{
return ImmutableList.<TaskRunnerWorkItem>copyOf(runningItems);
return runningItem == null ? Collections.emptyList() : Collections.singletonList(runningItem);
}
@Override
public Collection<TaskRunnerWorkItem> getPendingTasks()
{
return ImmutableList.of();
return Collections.emptyList();
}
@Override
public Collection<TaskRunnerWorkItem> getKnownTasks()
{
return ImmutableList.<TaskRunnerWorkItem>copyOf(runningItems);
return runningItem == null ? Collections.emptyList() : Collections.singletonList(runningItem);
}
@Override
@ -327,12 +309,6 @@ public class ThreadPoolTaskRunner implements TaskRunner, QuerySegmentWalker
return Optional.absent();
}
@Override
public void start()
{
// No state startup required
}
@Override
public <T> QueryRunner<T> getQueryRunnerForIntervals(Query<T> query, Iterable<Interval> intervals)
{
@ -350,8 +326,8 @@ public class ThreadPoolTaskRunner implements TaskRunner, QuerySegmentWalker
QueryRunner<T> queryRunner = null;
final String queryDataSource = Iterables.getOnlyElement(query.getDataSource().getNames());
for (final ThreadPoolTaskRunnerWorkItem taskRunnerWorkItem : ImmutableList.copyOf(runningItems)) {
final Task task = taskRunnerWorkItem.getTask();
if (runningItem != null) {
final Task task = runningItem.getTask();
if (task.getDataSource().equals(queryDataSource)) {
final QueryRunner<T> taskQueryRunner = task.getQueryRunner(query);
@ -367,30 +343,18 @@ public class ThreadPoolTaskRunner implements TaskRunner, QuerySegmentWalker
}
}
return new SetAndVerifyContextQueryRunner(
return new SetAndVerifyContextQueryRunner<>(
serverConfig,
queryRunner == null ? new NoopQueryRunner<T>() : queryRunner
queryRunner == null ? new NoopQueryRunner<>() : queryRunner
);
}
private static class ThreadPoolTaskRunnerWorkItem extends TaskRunnerWorkItem
private static class SingleTaskBackgroundRunnerWorkItem extends TaskRunnerWorkItem
{
private static final Comparator<ThreadPoolTaskRunnerWorkItem> COMPARATOR = new Comparator<ThreadPoolTaskRunnerWorkItem>()
{
@Override
public int compare(
ThreadPoolTaskRunnerWorkItem lhs,
ThreadPoolTaskRunnerWorkItem rhs
)
{
return lhs.getTaskId().compareTo(rhs.getTaskId());
}
};
private final Task task;
private final TaskLocation location;
private ThreadPoolTaskRunnerWorkItem(
private SingleTaskBackgroundRunnerWorkItem(
Task task,
TaskLocation location,
ListenableFuture<TaskStatus> result
@ -425,13 +389,13 @@ public class ThreadPoolTaskRunner implements TaskRunner, QuerySegmentWalker
}
}
private class ThreadPoolTaskRunnerCallable implements Callable<TaskStatus>
private class SingleTaskBackgroundRunnerCallable implements Callable<TaskStatus>
{
private final Task task;
private final TaskLocation location;
private final TaskToolbox toolbox;
public ThreadPoolTaskRunnerCallable(Task task, TaskLocation location, TaskToolbox toolbox)
SingleTaskBackgroundRunnerCallable(Task task, TaskLocation location, TaskToolbox toolbox)
{
this.task = task;
this.location = location;

View File

@ -44,6 +44,11 @@ public interface TaskRunner
*/
List<Pair<Task, ListenableFuture<TaskStatus>>> restore();
/**
* Start the state of the runner.
*/
void start();
/**
* Register a listener with this task runner. On registration, the listener will get events corresponding to the
* current state of known tasks.
@ -95,12 +100,4 @@ public interface TaskRunner
* @return ScalingStats if the runner has an underlying resource which can scale, Optional.absent() otherwise
*/
Optional<ScalingStats> getScalingStats();
/**
* Start the state of the runner.
*
* This method is unused, but TaskRunner is {@link PublicApi}, so we cannot remove it.
*/
@SuppressWarnings("unused")
void start();
}

View File

@ -0,0 +1,217 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.indexing.overlord;
import com.google.common.util.concurrent.ListenableFuture;
import io.druid.indexer.TaskState;
import io.druid.indexing.common.SegmentLoaderFactory;
import io.druid.indexing.common.TaskReportFileWriter;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.TaskToolbox;
import io.druid.indexing.common.TaskToolboxFactory;
import io.druid.indexing.common.TestUtils;
import io.druid.indexing.common.actions.TaskActionClient;
import io.druid.indexing.common.actions.TaskActionClientFactory;
import io.druid.indexing.common.config.TaskConfig;
import io.druid.indexing.common.task.AbstractTask;
import io.druid.indexing.common.task.NoopTask;
import io.druid.java.util.emitter.service.ServiceEmitter;
import io.druid.segment.loading.NoopDataSegmentArchiver;
import io.druid.segment.loading.NoopDataSegmentKiller;
import io.druid.segment.loading.NoopDataSegmentMover;
import io.druid.segment.loading.NoopDataSegmentPusher;
import io.druid.segment.loading.SegmentLoaderLocalCacheManager;
import io.druid.server.DruidNode;
import io.druid.server.coordination.NoopDataSegmentAnnouncer;
import io.druid.server.initialization.ServerConfig;
import io.druid.server.metrics.NoopServiceEmitter;
import org.easymock.EasyMock;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import java.io.File;
import java.io.IOException;
import java.util.Collections;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class SingleTaskBackgroundRunnerTest
{
@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();
private SingleTaskBackgroundRunner runner;
@Before
public void setup() throws IOException
{
final TestUtils utils = new TestUtils();
final DruidNode node = new DruidNode("testServer", "testHost", 1000, null, true, false);
final TaskConfig taskConfig = new TaskConfig(
temporaryFolder.newFile().toString(),
null,
null,
50000,
null,
true,
null,
null
);
final ServiceEmitter emitter = new NoopServiceEmitter();
final TaskToolboxFactory toolboxFactory = new TaskToolboxFactory(
taskConfig,
EasyMock.createMock(TaskActionClientFactory.class),
emitter,
new NoopDataSegmentPusher(),
new NoopDataSegmentKiller(),
new NoopDataSegmentMover(),
new NoopDataSegmentArchiver(),
new NoopDataSegmentAnnouncer(),
null,
null,
null,
null,
null,
new SegmentLoaderFactory(EasyMock.createMock(SegmentLoaderLocalCacheManager.class)),
utils.getTestObjectMapper(),
utils.getTestIndexIO(),
null,
null,
utils.getTestIndexMergerV9(),
null,
node,
null,
null,
new TaskReportFileWriter(new File("fake"))
);
runner = new SingleTaskBackgroundRunner(
toolboxFactory,
taskConfig,
emitter,
node,
new ServerConfig()
);
}
@After
public void teardown()
{
runner.stop();
}
@Test
public void testRun() throws ExecutionException, InterruptedException
{
Assert.assertEquals(
TaskState.SUCCESS,
runner.run(new NoopTask(null, null, 500L, 0, null, null, null)).get().getStatusCode()
);
}
@Test
public void testStop() throws ExecutionException, InterruptedException, TimeoutException
{
final ListenableFuture<TaskStatus> future = runner.run(
new NoopTask(null, null, Long.MAX_VALUE, 0, null, null, null) // infinite task
);
runner.stop();
Assert.assertEquals(
TaskState.FAILED,
future.get(1000, TimeUnit.MILLISECONDS).getStatusCode()
);
}
@Test
public void testStopWithRestorableTask() throws InterruptedException, ExecutionException, TimeoutException
{
final BooleanHolder holder = new BooleanHolder();
final ListenableFuture<TaskStatus> future = runner.run(
new RestorableTask(holder)
);
runner.stop();
Assert.assertEquals(
TaskState.SUCCESS,
future.get(1000, TimeUnit.MILLISECONDS).getStatusCode()
);
Assert.assertTrue(holder.get());
}
private static class RestorableTask extends AbstractTask
{
private final BooleanHolder gracefullyStopped;
RestorableTask(BooleanHolder gracefullyStopped)
{
super("testId", "testDataSource", Collections.emptyMap());
this.gracefullyStopped = gracefullyStopped;
}
@Override
public String getType()
{
return "restorable";
}
@Override
public boolean isReady(TaskActionClient taskActionClient)
{
return true;
}
@Override
public TaskStatus run(TaskToolbox toolbox)
{
return TaskStatus.success(getId());
}
@Override
public boolean canRestore()
{
return true;
}
@Override
public void stopGracefully()
{
gracefullyStopped.set();
}
}
private static class BooleanHolder
{
private boolean value;
void set()
{
this.value = true;
}
boolean get()
{
return value;
}
}
}

View File

@ -617,7 +617,7 @@ public class TaskLifecycleTest
Preconditions.checkNotNull(taskConfig);
Preconditions.checkNotNull(emitter);
return new ThreadPoolTaskRunner(
return new SingleTaskBackgroundRunner(
tb,
taskConfig,
emitter,

View File

@ -0,0 +1,405 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.indexing.overlord;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import io.druid.concurrent.TaskThreadPriority;
import io.druid.indexer.TaskLocation;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.TaskToolbox;
import io.druid.indexing.common.TaskToolboxFactory;
import io.druid.indexing.common.config.TaskConfig;
import io.druid.indexing.common.task.Task;
import io.druid.indexing.overlord.autoscaling.ScalingStats;
import io.druid.java.util.common.DateTimes;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.Numbers;
import io.druid.java.util.common.Pair;
import io.druid.java.util.common.RE;
import io.druid.java.util.common.concurrent.Execs;
import io.druid.java.util.emitter.EmittingLogger;
import io.druid.query.Query;
import io.druid.query.QueryRunner;
import io.druid.query.QuerySegmentWalker;
import io.druid.query.SegmentDescriptor;
import org.joda.time.Interval;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
/**
* Runs multiple tasks in a JVM thread using an ExecutorService. This task runner is supposed to be used only for unit
* tests.
*/
public class TestTaskRunner implements TaskRunner, QuerySegmentWalker
{
private static final EmittingLogger log = new EmittingLogger(TestTaskRunner.class);
private final ConcurrentMap<Integer, ListeningExecutorService> exec = new ConcurrentHashMap<>();
private final Set<TestTaskRunnerWorkItem> runningItems = new ConcurrentSkipListSet<>();
private final CopyOnWriteArrayList<Pair<TaskRunnerListener, Executor>> listeners = new CopyOnWriteArrayList<>();
private final TaskToolboxFactory toolboxFactory;
private final TaskConfig taskConfig;
private final TaskLocation taskLocation;
private volatile boolean stopping = false;
public TestTaskRunner(
TaskToolboxFactory toolboxFactory,
TaskConfig taskConfig,
TaskLocation taskLocation
)
{
this.toolboxFactory = Preconditions.checkNotNull(toolboxFactory, "toolboxFactory");
this.taskConfig = taskConfig;
this.taskLocation = taskLocation;
}
@Override
public List<Pair<Task, ListenableFuture<TaskStatus>>> restore()
{
return ImmutableList.of();
}
@Override
public void registerListener(TaskRunnerListener listener, Executor executor)
{
for (Pair<TaskRunnerListener, Executor> pair : listeners) {
if (pair.lhs.getListenerId().equals(listener.getListenerId())) {
throw new ISE("Listener [%s] already registered", listener.getListenerId());
}
}
final Pair<TaskRunnerListener, Executor> listenerPair = Pair.of(listener, executor);
// Location never changes for an existing task, so it's ok to add the listener first and then issue bootstrap
// callbacks without any special synchronization.
listeners.add(listenerPair);
log.info("Registered listener [%s]", listener.getListenerId());
for (TestTaskRunnerWorkItem item : runningItems) {
TaskRunnerUtils.notifyLocationChanged(ImmutableList.of(listenerPair), item.getTaskId(), item.getLocation());
}
}
@Override
public void unregisterListener(String listenerId)
{
for (Pair<TaskRunnerListener, Executor> pair : listeners) {
if (pair.lhs.getListenerId().equals(listenerId)) {
listeners.remove(pair);
log.info("Unregistered listener [%s]", listenerId);
return;
}
}
}
private static ListeningExecutorService buildExecutorService(int priority)
{
return MoreExecutors.listeningDecorator(
Execs.singleThreaded(
"test-task-runner-%d-priority-" + priority,
TaskThreadPriority.getThreadPriorityFromTaskPriority(priority)
)
);
}
@Override
public void stop()
{
stopping = true;
for (Map.Entry<Integer, ListeningExecutorService> entry : exec.entrySet()) {
try {
entry.getValue().shutdown();
}
catch (SecurityException ex) {
throw new RuntimeException("I can't control my own threads!", ex);
}
}
for (TestTaskRunnerWorkItem item : runningItems) {
final Task task = item.getTask();
final long start = System.currentTimeMillis();
if (taskConfig.isRestoreTasksOnRestart() && task.canRestore()) {
// Attempt graceful shutdown.
log.info("Starting graceful shutdown of task[%s].", task.getId());
try {
task.stopGracefully();
final TaskStatus taskStatus = item.getResult().get(
new Interval(DateTimes.utc(start), taskConfig.getGracefulShutdownTimeout()).toDurationMillis(),
TimeUnit.MILLISECONDS
);
// Ignore status, it doesn't matter for graceful shutdowns.
log.info(
"Graceful shutdown of task[%s] finished in %,dms.",
task.getId(),
System.currentTimeMillis() - start
);
TaskRunnerUtils.notifyStatusChanged(listeners, task.getId(), taskStatus);
}
catch (Exception e) {
TaskRunnerUtils.notifyStatusChanged(listeners, task.getId(), TaskStatus.failure(task.getId()));
throw new RE(e, "Graceful shutdown of task[%s] aborted with exception", task.getId());
}
} else {
TaskRunnerUtils.notifyStatusChanged(listeners, task.getId(), TaskStatus.failure(task.getId()));
}
}
// Ok, now interrupt everything.
for (Map.Entry<Integer, ListeningExecutorService> entry : exec.entrySet()) {
try {
entry.getValue().shutdownNow();
}
catch (SecurityException ex) {
throw new RuntimeException("I can't control my own threads!", ex);
}
}
}
@Override
public ListenableFuture<TaskStatus> run(final Task task)
{
final TaskToolbox toolbox = toolboxFactory.build(task);
final Object taskPriorityObj = task.getContextValue(TaskThreadPriority.CONTEXT_KEY);
int taskPriority = 0;
try {
taskPriority = taskPriorityObj == null ? 0 : Numbers.parseInt(taskPriorityObj);
}
catch (NumberFormatException e) {
log.error(e, "Error parsing task priority [%s] for task [%s]", taskPriorityObj, task.getId());
}
final int finalTaskPriority = taskPriority;
final ListenableFuture<TaskStatus> statusFuture = exec
.computeIfAbsent(taskPriority, k -> buildExecutorService(finalTaskPriority))
.submit(new TestTaskRunnerCallable(task, toolbox));
final TestTaskRunnerWorkItem taskRunnerWorkItem = new TestTaskRunnerWorkItem(task, statusFuture);
runningItems.add(taskRunnerWorkItem);
Futures.addCallback(
statusFuture,
new FutureCallback<TaskStatus>()
{
@Override
public void onSuccess(TaskStatus result)
{
runningItems.remove(taskRunnerWorkItem);
}
@Override
public void onFailure(Throwable t)
{
runningItems.remove(taskRunnerWorkItem);
}
}
);
return statusFuture;
}
@Override
public void shutdown(final String taskid)
{
for (final TaskRunnerWorkItem runningItem : runningItems) {
if (runningItem.getTaskId().equals(taskid)) {
runningItem.getResult().cancel(true);
}
}
}
@Override
public Collection<TaskRunnerWorkItem> getRunningTasks()
{
return ImmutableList.copyOf(runningItems);
}
@Override
public Collection<TaskRunnerWorkItem> getPendingTasks()
{
return ImmutableList.of();
}
@Override
public Collection<TaskRunnerWorkItem> getKnownTasks()
{
return ImmutableList.copyOf(runningItems);
}
@Override
public Optional<ScalingStats> getScalingStats()
{
return Optional.absent();
}
@Override
public void start()
{
// No state startup required
}
@Override
public <T> QueryRunner<T> getQueryRunnerForIntervals(Query<T> query, Iterable<Interval> intervals)
{
throw new UnsupportedOperationException();
}
@Override
public <T> QueryRunner<T> getQueryRunnerForSegments(Query<T> query, Iterable<SegmentDescriptor> specs)
{
throw new UnsupportedOperationException();
}
private static class TestTaskRunnerWorkItem extends TaskRunnerWorkItem
implements Comparable<TestTaskRunnerWorkItem>
{
private final Task task;
private TestTaskRunnerWorkItem(Task task, ListenableFuture<TaskStatus> result)
{
super(task.getId(), result);
this.task = task;
}
public Task getTask()
{
return task;
}
@Override
public TaskLocation getLocation()
{
return TaskLocation.create("testHost", 10000, 10000);
}
@Override
public String getTaskType()
{
return task.getType();
}
@Override
public String getDataSource()
{
return task.getDataSource();
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o instanceof TestTaskRunnerWorkItem) {
final TestTaskRunnerWorkItem that = (TestTaskRunnerWorkItem) o;
return task.getId().equals(that.task.getId());
}
return false;
}
@Override
public int hashCode()
{
return task.getId().hashCode();
}
@Override
public int compareTo(TestTaskRunnerWorkItem o)
{
return task.getId().compareTo(o.task.getId());
}
}
private class TestTaskRunnerCallable implements Callable<TaskStatus>
{
private final Task task;
private final TaskToolbox toolbox;
public TestTaskRunnerCallable(Task task, TaskToolbox toolbox)
{
this.task = task;
this.toolbox = toolbox;
}
@Override
public TaskStatus call()
{
final long startTime = System.currentTimeMillis();
TaskStatus status;
try {
log.info("Running task: %s", task.getId());
TaskRunnerUtils.notifyLocationChanged(
listeners,
task.getId(),
taskLocation
);
TaskRunnerUtils.notifyStatusChanged(listeners, task.getId(), TaskStatus.running(task.getId()));
status = task.run(toolbox);
}
catch (InterruptedException e) {
// Don't reset the interrupt flag of the thread, as we do want to continue to the end of this callable.
if (stopping) {
// Tasks may interrupt their own run threads to stop themselves gracefully; don't be too scary about this.
log.debug(e, "Interrupted while running task[%s] during graceful shutdown.", task);
} else {
// Not stopping, this is definitely unexpected.
log.warn(e, "Interrupted while running task[%s]", task);
}
status = TaskStatus.failure(task.getId());
}
catch (Exception e) {
log.error(e, "Exception while running task[%s]", task);
status = TaskStatus.failure(task.getId());
}
catch (Throwable t) {
throw new RE(t, "Uncaught Throwable while running task[%s]", task);
}
status = status.withDuration(System.currentTimeMillis() - startTime);
TaskRunnerUtils.notifyStatusChanged(listeners, task.getId(), status);
return status;
}
}
}

View File

@ -22,7 +22,6 @@ package io.druid.indexing.worker;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.io.Files;
import io.druid.discovery.DruidLeaderClient;
import io.druid.indexer.TaskLocation;
@ -38,18 +37,15 @@ import io.druid.indexing.common.task.NoopTask;
import io.druid.indexing.common.task.NoopTestTaskFileWriter;
import io.druid.indexing.common.task.Task;
import io.druid.indexing.common.task.Tasks;
import io.druid.indexing.overlord.ThreadPoolTaskRunner;
import io.druid.indexing.overlord.TestTaskRunner;
import io.druid.segment.IndexIO;
import io.druid.segment.IndexMergerV9;
import io.druid.segment.loading.SegmentLoaderConfig;
import io.druid.segment.loading.SegmentLoaderLocalCacheManager;
import io.druid.segment.loading.StorageLocationConfig;
import io.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory;
import io.druid.server.DruidNode;
import io.druid.server.coordination.ChangeRequestHistory;
import io.druid.server.coordination.ChangeRequestsSnapshot;
import io.druid.server.initialization.ServerConfig;
import io.druid.server.metrics.NoopServiceEmitter;
import org.easymock.EasyMock;
import org.junit.After;
import org.junit.Assert;
@ -57,14 +53,14 @@ import org.junit.Before;
import org.junit.Test;
import java.io.File;
import java.util.Collections;
import java.util.List;
/**
*/
public class WorkerTaskManagerTest
{
private static final DruidNode DUMMY_NODE = new DruidNode("dummy", "dummy", 9000, null, true, false);
private final TaskLocation location = TaskLocation.create("localhost", 1, 2);
private final ObjectMapper jsonMapper;
private final IndexMergerV9 indexMergerV9;
private final IndexIO indexIO;
@ -98,26 +94,33 @@ public class WorkerTaskManagerTest
SegmentHandoffNotifierFactory notifierFactory = EasyMock.createNiceMock(SegmentHandoffNotifierFactory.class);
EasyMock.replay(taskActionClientFactory, taskActionClient, notifierFactory);
final SegmentLoaderConfig loaderConfig = new SegmentLoaderConfig()
{
@Override
public List<StorageLocationConfig> getLocations()
{
return Collections.emptyList();
}
};
return new WorkerTaskManager(
jsonMapper,
new ThreadPoolTaskRunner(
new TestTaskRunner(
new TaskToolboxFactory(
taskConfig,
taskActionClientFactory,
null, null, null, null, null, null, null, notifierFactory, null, null, null, new SegmentLoaderFactory(
new SegmentLoaderLocalCacheManager(
null,
new SegmentLoaderConfig()
{
@Override
public List<StorageLocationConfig> getLocations()
{
return Lists.newArrayList();
}
},
jsonMapper
)
),
null,
null,
null,
null,
null,
null,
null,
notifierFactory,
null,
null,
null,
new SegmentLoaderFactory(new SegmentLoaderLocalCacheManager(null, loaderConfig, jsonMapper)),
jsonMapper,
indexIO,
null,
@ -130,9 +133,7 @@ public class WorkerTaskManagerTest
new NoopTestTaskFileWriter()
),
taskConfig,
new NoopServiceEmitter(),
DUMMY_NODE,
new ServerConfig()
location
),
taskConfig,
EasyMock.createNiceMock(DruidLeaderClient.class)
@ -181,7 +182,7 @@ public class WorkerTaskManagerTest
TaskAnnouncement.create(
task2,
TaskStatus.success(task2.getId()),
TaskLocation.create("localhost", 1, 2)
location
)
);
@ -196,11 +197,9 @@ public class WorkerTaskManagerTest
Assert.assertTrue(new File(workerTaskManager.getCompletedTaskDir(), task1.getId()).exists());
Assert.assertFalse(new File(workerTaskManager.getAssignedTaskDir(), task1.getId()).exists());
ChangeRequestsSnapshot<WorkerHistoryItem> baseHistory = workerTaskManager.getChangesSince(
new ChangeRequestHistory.Counter(
-1,
0
)).get();
ChangeRequestsSnapshot<WorkerHistoryItem> baseHistory = workerTaskManager
.getChangesSince(new ChangeRequestHistory.Counter(-1, 0))
.get();
Assert.assertFalse(baseHistory.isResetCounter());
Assert.assertEquals(3, baseHistory.getRequests().size());

View File

@ -39,7 +39,7 @@ import io.druid.indexing.common.config.TaskConfig;
import io.druid.indexing.common.task.NoopTestTaskFileWriter;
import io.druid.indexing.common.task.Task;
import io.druid.indexing.overlord.TestRemoteTaskRunnerConfig;
import io.druid.indexing.overlord.ThreadPoolTaskRunner;
import io.druid.indexing.overlord.SingleTaskBackgroundRunner;
import io.druid.java.util.common.StringUtils;
import io.druid.segment.IndexIO;
import io.druid.segment.IndexMergerV9;
@ -165,7 +165,7 @@ public class WorkerTaskMonitorTest
EasyMock.replay(taskActionClientFactory, taskActionClient, notifierFactory);
return new WorkerTaskMonitor(
jsonMapper,
new ThreadPoolTaskRunner(
new SingleTaskBackgroundRunner(
new TaskToolboxFactory(
taskConfig,
taskActionClientFactory,

View File

@ -0,0 +1,98 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.java.util.common;
public final class Numbers
{
/**
* Parse the given object as a {@code long}. The input object can be a {@link String} or one of the implementations of
* {@link Number}. You may want to use {@code GuavaUtils.tryParseLong()} instead if the input is a nullable string and
* you want to avoid any exceptions.
*
* @throws NumberFormatException if the input is an unparseable string.
* @throws NullPointerException if the input is null.
* @throws ISE if the input is not a string or a number.
*/
public static long parseLong(Object val)
{
if (val instanceof String) {
return Long.parseLong((String) val);
} else if (val instanceof Number) {
return ((Number) val).longValue();
} else {
if (val == null) {
throw new NullPointerException("Input is null");
} else {
throw new ISE("Unknown type [%s]", val.getClass());
}
}
}
/**
* Parse the given object as a {@code int}. The input object can be a {@link String} or one of the implementations of
* {@link Number}.
*
* @throws NumberFormatException if the input is an unparseable string.
* @throws NullPointerException if the input is null.
* @throws ISE if the input is not a string or a number.
*/
public static int parseInt(Object val)
{
if (val instanceof String) {
return Integer.parseInt((String) val);
} else if (val instanceof Number) {
return ((Number) val).intValue();
} else {
if (val == null) {
throw new NullPointerException("Input is null");
} else {
throw new ISE("Unknown type [%s]", val.getClass());
}
}
}
/**
* Parse the given object as a {@code boolean}. The input object can be a {@link String} or {@link Boolean}.
*
* @return {@code true} only if the input is a {@link Boolean} representing {@code true} or a {@link String} of
* {@code "true"}.
*
* @throws NullPointerException if the input is null.
* @throws ISE if the input is not a string or a number.
*/
public static boolean parseBoolean(Object val)
{
if (val instanceof String) {
return Boolean.parseBoolean((String) val);
} else if (val instanceof Boolean) {
return (boolean) val;
} else {
if (val == null) {
throw new NullPointerException("Input is null");
} else {
throw new ISE("Unknown type [%s]", val.getClass());
}
}
}
private Numbers()
{
}
}

View File

@ -0,0 +1,130 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.java.util.common;
import org.hamcrest.CoreMatchers;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
public class NumbersTest
{
@Rule
public ExpectedException expectedException = ExpectedException.none();
@Test
public void testParseLong()
{
final String strVal = "100";
Assert.assertEquals(100L, Numbers.parseLong(strVal));
final Long longVal = 100L;
Assert.assertEquals(100L, Numbers.parseLong(longVal));
final Double doubleVal = 100.;
Assert.assertEquals(100L, Numbers.parseLong(doubleVal));
}
@Test
public void testParseLongWithNull()
{
expectedException.expect(NullPointerException.class);
expectedException.expectMessage("Input is null");
Numbers.parseLong(null);
}
@Test
public void testParseLongWithUnparseableString()
{
expectedException.expect(NumberFormatException.class);
Numbers.parseLong("unparseable");
}
@Test
public void testParseLongWithUnparseableObject()
{
expectedException.expect(ISE.class);
expectedException.expectMessage(CoreMatchers.startsWith("Unknown type"));
Numbers.parseLong(new Object());
}
@Test
public void testParseInt()
{
final String strVal = "100";
Assert.assertEquals(100, Numbers.parseInt(strVal));
final Integer longVal = 100;
Assert.assertEquals(100, Numbers.parseInt(longVal));
final Float floatVal = 100.F;
Assert.assertEquals(100, Numbers.parseInt(floatVal));
}
@Test
public void testParseIntWithNull()
{
expectedException.expect(NullPointerException.class);
expectedException.expectMessage("Input is null");
Numbers.parseInt(null);
}
@Test
public void testParseIntWithUnparseableString()
{
expectedException.expect(NumberFormatException.class);
Numbers.parseInt("unparseable");
}
@Test
public void testParseIntWithUnparseableObject()
{
expectedException.expect(ISE.class);
expectedException.expectMessage(CoreMatchers.startsWith("Unknown type"));
Numbers.parseInt(new Object());
}
@Test
public void testParseBoolean()
{
final String strVal = "false";
Assert.assertEquals(false, Numbers.parseBoolean(strVal));
final Boolean booleanVal = Boolean.FALSE;
Assert.assertEquals(false, Numbers.parseBoolean(booleanVal));
}
@Test
public void testParseBooleanWithNull()
{
expectedException.expect(NullPointerException.class);
expectedException.expectMessage("Input is null");
Numbers.parseBoolean(null);
}
@Test
public void testParseBooleanWithUnparseableObject()
{
expectedException.expect(ISE.class);
expectedException.expectMessage(CoreMatchers.startsWith("Unknown type"));
Numbers.parseBoolean(new Object());
}
}

View File

@ -23,7 +23,7 @@ import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import io.druid.guice.annotations.PublicApi;
import io.druid.java.util.common.IAE;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.Numbers;
@PublicApi
public class QueryContexts
@ -212,46 +212,23 @@ public class QueryContexts
static <T> long parseLong(Query<T> query, String key, long defaultValue)
{
Object val = query.getContextValue(key);
if (val == null) {
return defaultValue;
}
if (val instanceof String) {
return Long.parseLong((String) val);
} else if (val instanceof Number) {
return ((Number) val).longValue();
} else {
throw new ISE("Unknown type [%s]", val.getClass());
}
final Object val = query.getContextValue(key);
return val == null ? defaultValue : Numbers.parseLong(val);
}
static <T> int parseInt(Query<T> query, String key, int defaultValue)
{
Object val = query.getContextValue(key);
if (val == null) {
return defaultValue;
}
if (val instanceof String) {
return Integer.parseInt((String) val);
} else if (val instanceof Number) {
return ((Number) val).intValue();
} else {
throw new ISE("Unknown type [%s]", val.getClass());
}
final Object val = query.getContextValue(key);
return val == null ? defaultValue : Numbers.parseInt(val);
}
static <T> boolean parseBoolean(Query<T> query, String key, boolean defaultValue)
{
Object val = query.getContextValue(key);
if (val == null) {
return defaultValue;
}
if (val instanceof String) {
return Boolean.parseBoolean((String) val);
} else if (val instanceof Boolean) {
return (boolean) val;
} else {
throw new ISE("Unknown type [%s]. Cannot parse!", val.getClass());
}
final Object val = query.getContextValue(key);
return val == null ? defaultValue : Numbers.parseBoolean(val);
}
private QueryContexts()
{
}
}

View File

@ -116,7 +116,7 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker
return new FluentQueryRunnerBuilder<>(toolChest)
.create(
new SetAndVerifyContextQueryRunner(
new SetAndVerifyContextQueryRunner<>(
serverConfig,
new RetryQueryRunner<>(
baseClientRunner,

View File

@ -31,38 +31,32 @@ import java.util.Map;
/**
* Use this QueryRunner to set and verify Query contexts.
*/
public class SetAndVerifyContextQueryRunner implements QueryRunner
public class SetAndVerifyContextQueryRunner<T> implements QueryRunner<T>
{
private final ServerConfig serverConfig;
private final QueryRunner baseRunner;
private final QueryRunner<T> baseRunner;
public SetAndVerifyContextQueryRunner(ServerConfig serverConfig, QueryRunner baseRunner)
public SetAndVerifyContextQueryRunner(ServerConfig serverConfig, QueryRunner<T> baseRunner)
{
this.serverConfig = serverConfig;
this.baseRunner = baseRunner;
}
@Override
public Sequence run(QueryPlus queryPlus, Map responseContext)
public Sequence<T> run(QueryPlus<T> queryPlus, Map<String, Object> responseContext)
{
return baseRunner.run(
QueryPlus.wrap((Query) withTimeoutAndMaxScatterGatherBytes(
queryPlus.getQuery(),
serverConfig
)),
QueryPlus.wrap(withTimeoutAndMaxScatterGatherBytes(queryPlus.getQuery(), serverConfig)),
responseContext
);
}
public static <T, QueryType extends Query<T>> QueryType withTimeoutAndMaxScatterGatherBytes(
final QueryType query,
ServerConfig serverConfig
)
public Query<T> withTimeoutAndMaxScatterGatherBytes(Query<T> query, ServerConfig serverConfig)
{
return (QueryType) QueryContexts.verifyMaxQueryTimeout(
return QueryContexts.verifyMaxQueryTimeout(
QueryContexts.withMaxScatterGatherBytes(
QueryContexts.withDefaultTimeout(
(Query) query,
query,
Math.min(serverConfig.getDefaultQueryTimeout(), serverConfig.getMaxQueryTimeout())
),
serverConfig.getMaxScatterGatherBytes()

View File

@ -0,0 +1,48 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.server.coordination;
import io.druid.timeline.DataSegment;
/**
* Mostly used for test purpose.
*/
public class NoopDataSegmentAnnouncer implements DataSegmentAnnouncer
{
@Override
public void announceSegment(DataSegment segment)
{
}
@Override
public void unannounceSegment(DataSegment segment)
{
}
@Override
public void announceSegments(Iterable<DataSegment> segments)
{
}
@Override
public void unannounceSegments(Iterable<DataSegment> segments)
{
}
}

View File

@ -23,8 +23,6 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.google.common.collect.Iterables;
import com.google.inject.Inject;
import io.druid.java.util.emitter.EmittingLogger;
import io.druid.java.util.emitter.service.ServiceEmitter;
import io.druid.client.CachingQueryRunner;
import io.druid.client.cache.Cache;
import io.druid.client.cache.CacheConfig;
@ -33,6 +31,8 @@ import io.druid.guice.annotations.Processing;
import io.druid.guice.annotations.Smile;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.guava.FunctionalIterable;
import io.druid.java.util.emitter.EmittingLogger;
import io.druid.java.util.emitter.service.ServiceEmitter;
import io.druid.query.BySegmentQueryRunner;
import io.druid.query.CPUTimeMetricQueryRunner;
import io.druid.query.DataSource;
@ -280,26 +280,26 @@ public class ServerManager implements QuerySegmentWalker
{
SpecificSegmentSpec segmentSpec = new SpecificSegmentSpec(segmentDescriptor);
String segmentId = adapter.getIdentifier();
return new SetAndVerifyContextQueryRunner(
return new SetAndVerifyContextQueryRunner<>(
serverConfig,
CPUTimeMetricQueryRunner.safeBuild(
new SpecificSegmentQueryRunner<T>(
new MetricsEmittingQueryRunner<T>(
new SpecificSegmentQueryRunner<>(
new MetricsEmittingQueryRunner<>(
emitter,
toolChest,
new BySegmentQueryRunner<T>(
new BySegmentQueryRunner<>(
segmentId,
adapter.getDataInterval().getStart(),
new CachingQueryRunner<T>(
new CachingQueryRunner<>(
segmentId,
segmentDescriptor,
objectMapper,
cache,
toolChest,
new MetricsEmittingQueryRunner<T>(
new MetricsEmittingQueryRunner<>(
emitter,
toolChest,
new ReferenceCountingSegmentQueryRunner<T>(factory, adapter, segmentDescriptor),
new ReferenceCountingSegmentQueryRunner<>(factory, adapter, segmentDescriptor),
QueryMetrics::reportSegmentTime,
queryMetrics -> queryMetrics.segment(segmentId)
),

View File

@ -67,7 +67,7 @@ import io.druid.indexing.overlord.HeapMemoryTaskStorage;
import io.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
import io.druid.indexing.overlord.TaskRunner;
import io.druid.indexing.overlord.TaskStorage;
import io.druid.indexing.overlord.ThreadPoolTaskRunner;
import io.druid.indexing.overlord.SingleTaskBackgroundRunner;
import io.druid.indexing.worker.executor.ExecutorLifecycle;
import io.druid.indexing.worker.executor.ExecutorLifecycleConfig;
import io.druid.java.util.common.lifecycle.Lifecycle;
@ -207,9 +207,9 @@ public class CliPeon extends GuiceRunnable
)
);
binder.bind(TaskRunner.class).to(ThreadPoolTaskRunner.class);
binder.bind(QuerySegmentWalker.class).to(ThreadPoolTaskRunner.class);
binder.bind(ThreadPoolTaskRunner.class).in(ManageLifecycle.class);
binder.bind(TaskRunner.class).to(SingleTaskBackgroundRunner.class);
binder.bind(QuerySegmentWalker.class).to(SingleTaskBackgroundRunner.class);
binder.bind(SingleTaskBackgroundRunner.class).in(ManageLifecycle.class);
JsonConfigProvider.bind(binder, "druid.realtime.cache", CacheConfig.class);
binder.install(new CacheModule());

View File

@ -20,7 +20,6 @@
package io.druid.cli;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.inject.Binder;
import com.google.inject.Inject;
import com.google.inject.Module;
@ -37,15 +36,14 @@ import io.druid.guice.RealtimeModule;
import io.druid.java.util.common.logger.Logger;
import io.druid.query.lookup.LookupModule;
import io.druid.segment.loading.DataSegmentPusher;
import io.druid.segment.loading.NoopDataSegmentPusher;
import io.druid.server.coordination.DataSegmentAnnouncer;
import io.druid.server.coordination.NoopDataSegmentAnnouncer;
import io.druid.server.initialization.jetty.ChatHandlerServerModule;
import io.druid.timeline.DataSegment;
import java.io.File;
import java.net.URI;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Executor;
@ -136,60 +134,4 @@ public class CliRealtimeExample extends ServerRunnable
return false;
}
}
private static class NoopDataSegmentPusher implements DataSegmentPusher
{
@Override
public String getPathForHadoop()
{
return "noop";
}
@Deprecated
@Override
public String getPathForHadoop(String dataSource)
{
return getPathForHadoop();
}
@Override
public DataSegment push(File file, DataSegment segment, boolean useUniquePath)
{
return segment;
}
@Override
public Map<String, Object> makeLoadSpec(URI uri)
{
return ImmutableMap.of();
}
}
private static class NoopDataSegmentAnnouncer implements DataSegmentAnnouncer
{
@Override
public void announceSegment(DataSegment segment)
{
// do nothing
}
@Override
public void unannounceSegment(DataSegment segment)
{
// do nothing
}
@Override
public void announceSegments(Iterable<DataSegment> segments)
{
// do nothing
}
@Override
public void unannounceSegments(Iterable<DataSegment> segments)
{
// do nothing
}
}
}