Use Guava Compatible immediate executor service (#6815)

* Use multi-guava version friendly direct executor implementation

* Don't use a singleton

* Fix strict compliation complaints

* Copy Guava's DirectExecutor

* Fix javadoc

* Imports are the devil
This commit is contained in:
Charles Allen 2019-01-11 10:42:19 -08:00 committed by Slim Bouguerra
parent 39780d914c
commit 5d2947cd52
43 changed files with 2458 additions and 468 deletions

View File

@ -20,6 +20,9 @@ com.google.common.collect.Sets#newHashSet() @ Create java.util.HashSet directly
com.google.common.collect.Sets#newLinkedHashSet() @ Create java.util.LinkedHashSet directly
com.google.common.collect.Sets#newTreeSet() @ Create java.util.TreeSet directly
com.google.common.collect.Sets#newTreeSet(java.util.Comparator) @ Create java.util.TreeSet directly
com.google.common.util.concurrent.MoreExecutors#sameThreadExecutor() @ Use org.apache.druid.java.util.common.concurrent.Execs#directExecutor()
com.google.common.util.concurrent.MoreExecutors#newDirectExecutorService() @ Use org.apache.druid.java.util.common.concurrent.Execs#directExecutor()
com.google.common.util.concurrent.MoreExecutors#directExecutor() @ Use org.apache.druid.java.util.common.concurrent.Execs#directExecutor()
com.google.common.util.concurrent.Futures#transform(com.google.common.util.concurrent.ListenableFuture, com.google.common.util.concurrent.AsyncFunction) @ Use org.apache.druid.java.util.common.concurrent.ListenableFutures#transformAsync
java.io.File#toURL() @ Use java.io.File#toURI() and java.net.URI#toURL() instead
java.lang.String#matches(java.lang.String) @ Use startsWith(), endsWith(), contains(), or compile and cache a Pattern explicitly

View File

@ -0,0 +1,170 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.java.util.common.concurrent;
import com.google.common.util.concurrent.AbstractListeningExecutorService;
import javax.annotation.concurrent.GuardedBy;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
// Copy of Guava's Apache 2.0 licensed https://github.com/google/guava/blob/a5cafa67da64a12444037bd4f4c30c39a0c184aa/guava/src/com/google/common/util/concurrent/MoreExecutors.java#L240-L339
/**
* Creates an executor service that runs each task in the thread that invokes {@code
* execute/submit}, as in {@link java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy} This applies both to individually submitted
* tasks and to collections of tasks submitted via {@code invokeAll} or {@code invokeAny}. In the
* latter case, tasks will run serially on the calling thread. Tasks are run to completion before
* a {@code Future} is returned to the caller (unless the executor has been shutdown).
*
* <p>Although all tasks are immediately executed in the thread that submitted the task, this
* {@code ExecutorService} imposes a small locking overhead on each task submission in order to
* implement shutdown and termination behavior.
*
* <p>The implementation deviates from the {@code ExecutorService} specification with regards to
* the {@code shutdownNow} method. First, "best-effort" with regards to canceling running tasks is
* implemented as "no-effort". No interrupts or other attempts are made to stop threads executing
* tasks. Second, the returned list will always be empty, as any submitted task is considered to
* have started execution. This applies also to tasks given to {@code invokeAll} or {@code
* invokeAny} which are pending serial execution, even the subset of the tasks that have not yet
* started execution. It is unclear from the {@code ExecutorService} specification if these should
* be included, and it's much easier to implement the interpretation that they not be. Finally, a
* call to {@code shutdown} or {@code shutdownNow} may result in concurrent calls to {@code
* invokeAll/invokeAny} throwing RejectedExecutionException, although a subset of the tasks may
* already have been executed.
*/
public class DirectExecutorService extends AbstractListeningExecutorService
{
/**
* Lock used whenever accessing the state variables (runningTasks, shutdown) of the executor
*/
private final Object lock = new Object();
/*
* Conceptually, these two variables describe the executor being in
* one of three states:
* - Active: shutdown == false
* - Shutdown: runningTasks > 0 and shutdown == true
* - Terminated: runningTasks == 0 and shutdown == true
*/
@GuardedBy("lock")
private int runningTasks = 0;
@GuardedBy("lock")
private boolean shutdown = false;
@Override
public void execute(Runnable command)
{
startTask();
try {
command.run();
}
finally {
endTask();
}
}
@Override
public boolean isShutdown()
{
synchronized (lock) {
return shutdown;
}
}
@Override
public void shutdown()
{
synchronized (lock) {
shutdown = true;
if (runningTasks == 0) {
lock.notifyAll();
}
}
}
// See newDirectExecutorService javadoc for unusual behavior of this method.
@Override
public List<Runnable> shutdownNow()
{
shutdown();
return Collections.emptyList();
}
@Override
public boolean isTerminated()
{
synchronized (lock) {
return shutdown && runningTasks == 0;
}
}
@Override
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException
{
long nanos = unit.toNanos(timeout);
synchronized (lock) {
while (true) {
if (shutdown && runningTasks == 0) {
return true;
} else if (nanos <= 0) {
return false;
} else {
long now = System.nanoTime();
TimeUnit.NANOSECONDS.timedWait(lock, nanos);
nanos -= System.nanoTime() - now; // subtract the actual time we waited
}
}
}
}
/**
* Checks if the executor has been shut down and increments the running task count.
*
* @throws RejectedExecutionException if the executor has been previously shutdown
*/
private void startTask()
{
synchronized (lock) {
if (shutdown) {
throw new RejectedExecutionException("Executor already shutdown");
}
runningTasks++;
}
}
/**
* Decrements the running task count.
*/
private void endTask()
{
synchronized (lock) {
int numRunning = --runningTasks;
if (numRunning == 0) {
lock.notifyAll();
}
}
}
}

View File

@ -21,6 +21,7 @@ package org.apache.druid.java.util.common.concurrent;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import javax.annotation.Nullable;
@ -38,9 +39,11 @@ import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
*
*/
public class Execs
{
/**
* Returns an ExecutorService which is terminated and shutdown from the beginning and not able to accept any tasks.
*/
@ -152,4 +155,9 @@ public class Execs
}
);
}
public static ListeningExecutorService directExecutor()
{
return new DirectExecutorService();
}
}

View File

@ -118,4 +118,10 @@ public class ExecsTest
blockingExecutor.shutdown();
producer.shutdown();
}
@Test
public void testDirectExecutorFactory()
{
Assert.assertNotNull(Execs.directExecutor());
}
}

View File

@ -19,7 +19,7 @@
package org.apache.druid.java.util.common.guava;
import com.google.common.util.concurrent.MoreExecutors;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.junit.Assert;
import org.junit.Test;
@ -40,11 +40,11 @@ public class WithEffectSequenceTest
.simple(Arrays.asList(1, 2, 3))
.withEffect(
() -> effect1.set(counter.incrementAndGet()),
MoreExecutors.sameThreadExecutor()
Execs.directExecutor()
)
.withEffect(
() -> effect2.set(counter.incrementAndGet()),
MoreExecutors.sameThreadExecutor()
Execs.directExecutor()
);
// Run sequence via accumulate
sequence.toList();
@ -70,7 +70,7 @@ public class WithEffectSequenceTest
});
final AtomicBoolean effectExecuted = new AtomicBoolean();
Sequence<Integer> seqWithEffect =
throwingSeq.withEffect(() -> effectExecuted.set(true), MoreExecutors.sameThreadExecutor());
throwingSeq.withEffect(() -> effectExecuted.set(true), Execs.directExecutor());
try {
seqWithEffect.toList();
Assert.fail("expected RuntimeException");

View File

@ -261,7 +261,7 @@ public class KafkaLookupExtractorFactory implements LookupExtractorFactory
}
}
},
MoreExecutors.sameThreadExecutor()
Execs.directExecutor()
);
this.future = future;
final Stopwatch stopwatch = Stopwatch.createStarted();

View File

@ -2452,7 +2452,7 @@ public class KafkaIndexTaskTest
EasyMock.createNiceMock(DataSegmentServerAnnouncer.class),
handoffNotifierFactory,
this::makeTimeseriesOnlyConglomerate,
MoreExecutors.sameThreadExecutor(), // queryExecutorService
Execs.directExecutor(), // queryExecutorService
EasyMock.createMock(MonitorScheduler.class),
new SegmentLoaderFactory(
new SegmentLoaderLocalCacheManager(null, segmentLoaderConfig, testUtils.getTestObjectMapper())

View File

@ -2765,7 +2765,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
EasyMock.createNiceMock(DataSegmentServerAnnouncer.class),
handoffNotifierFactory,
this::makeTimeseriesOnlyConglomerate,
MoreExecutors.sameThreadExecutor(), // queryExecutorService
Execs.directExecutor(), // queryExecutorService
EasyMock.createMock(MonitorScheduler.class),
new SegmentLoaderFactory(
new SegmentLoaderLocalCacheManager(null, segmentLoaderConfig, testUtils.getTestObjectMapper())

View File

@ -20,8 +20,8 @@
package org.apache.druid.query.aggregation.variance;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.MoreExecutors;
import org.apache.druid.data.input.Row;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.granularity.PeriodGranularity;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QueryRunnerTestHelper;
@ -47,6 +47,7 @@ import java.util.Collections;
import java.util.List;
/**
*
*/
@RunWith(Parameterized.class)
public class VarianceGroupByQueryTest
@ -72,7 +73,7 @@ public class VarianceGroupByQueryTest
this.testName = testName;
this.config = config;
this.factory = factory;
this.runner = factory.mergeRunners(MoreExecutors.sameThreadExecutor(), ImmutableList.of(runner));
this.runner = factory.mergeRunners(Execs.directExecutor(), ImmutableList.of(runner));
}
@Test

View File

@ -97,6 +97,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/**
*
*/
public class IndexGeneratorJob implements Jobby
{
@ -240,7 +241,8 @@ public class IndexGeneratorJob implements Jobby
Map<String, Object> metrics = TaskMetricsUtils.makeIngestionRowMetrics(
jobCounters.findCounter(HadoopDruidIndexerConfig.IndexJobCounters.ROWS_PROCESSED_COUNTER).getValue(),
jobCounters.findCounter(HadoopDruidIndexerConfig.IndexJobCounters.ROWS_PROCESSED_WITH_ERRORS_COUNTER).getValue(),
jobCounters.findCounter(HadoopDruidIndexerConfig.IndexJobCounters.ROWS_PROCESSED_WITH_ERRORS_COUNTER)
.getValue(),
jobCounters.findCounter(HadoopDruidIndexerConfig.IndexJobCounters.ROWS_UNPARSEABLE_COUNTER).getValue(),
jobCounters.findCounter(HadoopDruidIndexerConfig.IndexJobCounters.ROWS_THROWN_AWAY_COUNTER).getValue()
);
@ -345,7 +347,9 @@ public class IndexGeneratorJob implements Jobby
throw new ISE("WTF?! No bucket found for row: %s", inputRow);
}
final long truncatedTimestamp = granularitySpec.getQueryGranularity().bucketStart(inputRow.getTimestamp()).getMillis();
final long truncatedTimestamp = granularitySpec.getQueryGranularity()
.bucketStart(inputRow.getTimestamp())
.getMillis();
final byte[] hashedDimensions = hashFunction.hashBytes(
HadoopDruidIndexerConfig.JSON_MAPPER.writeValueAsBytes(
Rows.toGroupKey(
@ -678,7 +682,7 @@ public class IndexGeneratorJob implements Jobby
);
persistExecutor = MoreExecutors.listeningDecorator(executorService);
} else {
persistExecutor = MoreExecutors.sameThreadExecutor();
persistExecutor = Execs.directExecutor();
}
for (final BytesWritable bw : values) {
@ -786,7 +790,10 @@ public class IndexGeneratorJob implements Jobby
// ShardSpec to be published.
final ShardSpec shardSpecForPublishing;
if (config.isForceExtendableShardSpecs()) {
shardSpecForPublishing = new NumberedShardSpec(shardSpecForPartitioning.getPartitionNum(), config.getShardSpecCount(bucket));
shardSpecForPublishing = new NumberedShardSpec(
shardSpecForPartitioning.getPartitionNum(),
config.getShardSpecCount(bucket)
);
} else {
shardSpecForPublishing = shardSpecForPartitioning;
}

View File

@ -1051,7 +1051,7 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
{
notices.add(new RunNotice());
}
}, MoreExecutors.sameThreadExecutor()
}, Execs.directExecutor()
);
listenerRegistered = true;
}

View File

@ -22,7 +22,6 @@ package org.apache.druid.indexing.worker;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.inject.Inject;
import org.apache.curator.framework.CuratorFramework;
import org.apache.druid.curator.CuratorUtils;
@ -30,6 +29,7 @@ import org.apache.druid.curator.announcement.Announcer;
import org.apache.druid.indexing.overlord.config.RemoteTaskRunnerConfig;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
import org.apache.druid.java.util.common.logger.Logger;
@ -77,7 +77,7 @@ public class WorkerCuratorCoordinator
this.curatorFramework = curatorFramework;
this.worker = worker;
this.announcer = new Announcer(curatorFramework, MoreExecutors.sameThreadExecutor());
this.announcer = new Announcer(curatorFramework, Execs.directExecutor());
this.baseAnnouncementsPath = getPath(Arrays.asList(indexerZkConfig.getAnnouncementsPath(), worker.getHost()));
this.baseTaskPath = getPath(Arrays.asList(indexerZkConfig.getTasksPath(), worker.getHost()));

View File

@ -28,7 +28,6 @@ import com.google.common.collect.Lists;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import com.google.inject.Inject;
import org.apache.druid.client.indexing.IndexingService;
@ -69,7 +68,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
/**
* This class manages the list of tasks assigned to this worker.
*
* <p>
* It persists the list of assigned and completed tasks on disk. assigned task from disk is deleted as soon as it
* starts running and completed task on disk is deleted based on a periodic schedule where overlord is asked for
* active tasks to see which completed tasks are safe to delete.
@ -226,7 +225,7 @@ public abstract class WorkerTaskManager
// do nothing
}
},
MoreExecutors.sameThreadExecutor()
Execs.directExecutor()
);
}
@ -456,9 +455,12 @@ public abstract class WorkerTaskManager
);
if (fullResponseHolder.getStatus().getCode() == 200) {
String responseContent = fullResponseHolder.getContent();
taskStatusesFromOverlord = jsonMapper.readValue(responseContent, new TypeReference<Map<String, TaskStatus>>()
taskStatusesFromOverlord = jsonMapper.readValue(
responseContent,
new TypeReference<Map<String, TaskStatus>>()
{
});
}
);
log.debug("Received completed task status response [%s].", responseContent);
} else if (fullResponseHolder.getStatus().getCode() == 404) {
// NOTE: this is to support backward compatibility, when overlord doesn't have "activeTasks" endpoint.
@ -717,5 +719,6 @@ public abstract class WorkerTaskManager
//in Overlord as well as MiddleManagers then WorkerTaskMonitor should be deleted, this class should no longer be abstract
//and the methods below should be removed.
protected abstract void taskStarted(String taskId);
protected abstract void taskAnnouncementChanged(TaskAnnouncement announcement);
}

View File

@ -1607,7 +1607,7 @@ public class AppenderatorDriverRealtimeIndexTaskTest
EasyMock.createNiceMock(DataSegmentServerAnnouncer.class),
handoffNotifierFactory,
() -> conglomerate,
MoreExecutors.sameThreadExecutor(), // queryExecutorService
Execs.directExecutor(), // queryExecutorService
EasyMock.createMock(MonitorScheduler.class),
new SegmentLoaderFactory(
new SegmentLoaderLocalCacheManager(null, segmentLoaderConfig, testUtils.getTestObjectMapper())

View File

@ -1076,7 +1076,7 @@ public class RealtimeIndexTaskTest
EasyMock.createNiceMock(DataSegmentServerAnnouncer.class),
handoffNotifierFactory,
() -> conglomerate,
MoreExecutors.sameThreadExecutor(), // queryExecutorService
Execs.directExecutor(), // queryExecutorService
EasyMock.createMock(MonitorScheduler.class),
new SegmentLoaderFactory(
new SegmentLoaderLocalCacheManager(null, segmentLoaderConfig, testUtils.getTestObjectMapper())

View File

@ -33,7 +33,6 @@ import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Ordering;
import com.google.common.util.concurrent.MoreExecutors;
import org.apache.druid.client.cache.CachePopulatorStats;
import org.apache.druid.client.cache.MapCache;
import org.apache.druid.data.input.Firehose;
@ -81,6 +80,7 @@ import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.RE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.guava.Comparators;
import org.apache.druid.java.util.emitter.EmittingLogger;
@ -611,7 +611,7 @@ public class TaskLifecycleTest
EasyMock.createNiceMock(DataSegmentServerAnnouncer.class),
handoffNotifierFactory,
() -> queryRunnerFactoryConglomerate, // query runner factory conglomerate corporation unionized collective
MoreExecutors.sameThreadExecutor(), // query executor service
Execs.directExecutor(), // query executor service
monitorScheduler, // monitor scheduler
new SegmentLoaderFactory(
new SegmentLoaderLocalCacheManager(null, segmentLoaderConfig, new DefaultObjectMapper())

View File

@ -27,7 +27,6 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.MoreExecutors;
import org.apache.curator.framework.CuratorFramework;
import org.apache.druid.common.guava.DSuppliers;
import org.apache.druid.discovery.DiscoveryDruidNode;
@ -49,6 +48,7 @@ import org.apache.druid.indexing.overlord.setup.DefaultWorkerBehaviorConfig;
import org.apache.druid.indexing.worker.TaskAnnouncement;
import org.apache.druid.indexing.worker.Worker;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.http.client.HttpClient;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.server.DruidNode;
@ -73,6 +73,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
/**
*
*/
public class HttpRemoteTaskRunnerTest
{
@ -91,7 +92,8 @@ public class HttpRemoteTaskRunnerTest
HttpRemoteTaskRunner taskRunner = new HttpRemoteTaskRunner(
TestHelper.makeJsonMapper(),
new HttpRemoteTaskRunnerConfig() {
new HttpRemoteTaskRunnerConfig()
{
@Override
public int getPendingTasksRunnerNumThreads()
{
@ -105,7 +107,8 @@ public class HttpRemoteTaskRunnerTest
EasyMock.createNiceMock(TaskStorage.class),
EasyMock.createNiceMock(CuratorFramework.class),
new IndexerZkConfig(new ZkPathsConfig(), null, null, null, null)
) {
)
{
@Override
protected WorkerHolder createWorkerHolder(
ObjectMapper smileMapper,
@ -184,7 +187,8 @@ public class HttpRemoteTaskRunnerTest
HttpRemoteTaskRunner taskRunner = new HttpRemoteTaskRunner(
TestHelper.makeJsonMapper(),
new HttpRemoteTaskRunnerConfig() {
new HttpRemoteTaskRunnerConfig()
{
@Override
public int getPendingTasksRunnerNumThreads()
{
@ -198,7 +202,8 @@ public class HttpRemoteTaskRunnerTest
EasyMock.createNiceMock(TaskStorage.class),
EasyMock.createNiceMock(CuratorFramework.class),
new IndexerZkConfig(new ZkPathsConfig(), null, null, null, null)
) {
)
{
@Override
protected WorkerHolder createWorkerHolder(
ObjectMapper smileMapper,
@ -284,7 +289,8 @@ public class HttpRemoteTaskRunnerTest
HttpRemoteTaskRunner taskRunner = new HttpRemoteTaskRunner(
TestHelper.makeJsonMapper(),
new HttpRemoteTaskRunnerConfig() {
new HttpRemoteTaskRunnerConfig()
{
@Override
public int getPendingTasksRunnerNumThreads()
{
@ -298,7 +304,8 @@ public class HttpRemoteTaskRunnerTest
taskStorageMock,
EasyMock.createNiceMock(CuratorFramework.class),
new IndexerZkConfig(new ZkPathsConfig(), null, null, null, null)
) {
)
{
@Override
protected WorkerHolder createWorkerHolder(
ObjectMapper smileMapper,
@ -316,7 +323,8 @@ public class HttpRemoteTaskRunnerTest
config,
workersSyncExec,
listener,
worker);
worker
);
} else {
throw new ISE("No WorkerHolder for [%s].", worker.getHost());
}
@ -420,7 +428,8 @@ public class HttpRemoteTaskRunnerTest
HttpRemoteTaskRunner taskRunner = new HttpRemoteTaskRunner(
TestHelper.makeJsonMapper(),
new HttpRemoteTaskRunnerConfig() {
new HttpRemoteTaskRunnerConfig()
{
@Override
public int getPendingTasksRunnerNumThreads()
{
@ -434,7 +443,8 @@ public class HttpRemoteTaskRunnerTest
EasyMock.createNiceMock(TaskStorage.class),
EasyMock.createNiceMock(CuratorFramework.class),
new IndexerZkConfig(new ZkPathsConfig(), null, null, null, null)
) {
)
{
@Override
protected WorkerHolder createWorkerHolder(
ObjectMapper smileMapper,
@ -452,7 +462,8 @@ public class HttpRemoteTaskRunnerTest
config,
workersSyncExec,
listener,
worker);
worker
);
} else {
throw new ISE("No WorkerHolder for [%s].", worker.getHost());
}
@ -593,7 +604,8 @@ public class HttpRemoteTaskRunnerTest
HttpRemoteTaskRunner taskRunner = new HttpRemoteTaskRunner(
TestHelper.makeJsonMapper(),
new HttpRemoteTaskRunnerConfig() {
new HttpRemoteTaskRunnerConfig()
{
@Override
public Period getTaskCleanupTimeout()
{
@ -607,7 +619,8 @@ public class HttpRemoteTaskRunnerTest
EasyMock.createNiceMock(TaskStorage.class),
EasyMock.createNiceMock(CuratorFramework.class),
new IndexerZkConfig(new ZkPathsConfig(), null, null, null, null)
) {
)
{
@Override
protected WorkerHolder createWorkerHolder(
ObjectMapper smileMapper,
@ -625,7 +638,8 @@ public class HttpRemoteTaskRunnerTest
config,
workersSyncExec,
listener,
worker);
worker
);
} else {
throw new ISE("No WorkerHolder for [%s].", worker.getHost());
}
@ -801,7 +815,8 @@ public class HttpRemoteTaskRunnerTest
config,
workersSyncExec,
listener,
worker);
worker
);
} else {
throw new ISE("No WorkerHolder for [%s].", worker.getHost());
}
@ -914,7 +929,8 @@ public class HttpRemoteTaskRunnerTest
Assert.assertEquals(task1.getId(), Iterables.getOnlyElement(taskRunner.getRunningTasks()).getTaskId());
Assert.assertEquals(task2.getId(), Iterables.getOnlyElement(taskRunner.getPendingTasks()).getTaskId());
Assert.assertEquals("host3:8080",
Assert.assertEquals(
"host3:8080",
Iterables.getOnlyElement(taskRunner.markWorkersLazy(Predicates.alwaysTrue(), Integer.MAX_VALUE))
.getHost()
);
@ -969,7 +985,9 @@ public class HttpRemoteTaskRunnerTest
// Another "rogue-worker" reports running it, and gets asked to shutdown the task
WorkerHolder rogueWorkerHolder = EasyMock.createMock(WorkerHolder.class);
EasyMock.expect(rogueWorkerHolder.getWorker()).andReturn(new Worker("http", "rogue-worker", "127.0.0.1", 5, "v1")).anyTimes();
EasyMock.expect(rogueWorkerHolder.getWorker())
.andReturn(new Worker("http", "rogue-worker", "127.0.0.1", 5, "v1"))
.anyTimes();
rogueWorkerHolder.shutdownTask(task.getId());
EasyMock.replay(rogueWorkerHolder);
taskRunner.taskAddedOrUpdated(TaskAnnouncement.create(
@ -982,7 +1000,9 @@ public class HttpRemoteTaskRunnerTest
// "rogue-worker" reports FAILURE for the task, ignored
rogueWorkerHolder = EasyMock.createMock(WorkerHolder.class);
EasyMock.expect(rogueWorkerHolder.getWorker()).andReturn(new Worker("http", "rogue-worker", "127.0.0.1", 5, "v1")).anyTimes();
EasyMock.expect(rogueWorkerHolder.getWorker())
.andReturn(new Worker("http", "rogue-worker", "127.0.0.1", 5, "v1"))
.anyTimes();
EasyMock.replay(rogueWorkerHolder);
taskRunner.taskAddedOrUpdated(TaskAnnouncement.create(
task,
@ -1003,7 +1023,9 @@ public class HttpRemoteTaskRunnerTest
// "rogue-worker" reports running it, and gets asked to shutdown the task
rogueWorkerHolder = EasyMock.createMock(WorkerHolder.class);
EasyMock.expect(rogueWorkerHolder.getWorker()).andReturn(new Worker("http", "rogue-worker", "127.0.0.1", 5, "v1")).anyTimes();
EasyMock.expect(rogueWorkerHolder.getWorker())
.andReturn(new Worker("http", "rogue-worker", "127.0.0.1", 5, "v1"))
.anyTimes();
rogueWorkerHolder.shutdownTask(task.getId());
EasyMock.replay(rogueWorkerHolder);
taskRunner.taskAddedOrUpdated(TaskAnnouncement.create(
@ -1016,7 +1038,9 @@ public class HttpRemoteTaskRunnerTest
// "rogue-worker" reports FAILURE for the tasks, ignored
rogueWorkerHolder = EasyMock.createMock(WorkerHolder.class);
EasyMock.expect(rogueWorkerHolder.getWorker()).andReturn(new Worker("http", "rogue-worker", "127.0.0.1", 5, "v1")).anyTimes();
EasyMock.expect(rogueWorkerHolder.getWorker())
.andReturn(new Worker("http", "rogue-worker", "127.0.0.1", 5, "v1"))
.anyTimes();
EasyMock.replay(rogueWorkerHolder);
taskRunner.taskAddedOrUpdated(TaskAnnouncement.create(
task,
@ -1228,7 +1252,7 @@ public class HttpRemoteTaskRunnerTest
listenerNotificationsAccumulator.add(ImmutableList.of(taskId, status));
}
},
MoreExecutors.sameThreadExecutor()
Execs.directExecutor()
);
}

View File

@ -24,9 +24,9 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.MoreExecutors;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.guava.MergeSequence;
@ -76,11 +76,13 @@ import java.util.List;
import java.util.Map;
/**
*
*/
public class QueryRunnerTestHelper
{
public static final QueryWatcher NOOP_QUERYWATCHER = (query, future) -> {};
public static final QueryWatcher NOOP_QUERYWATCHER = (query, future) -> {
};
public static final String segmentId = "testSegment";
public static final String dataSource = "testing";
@ -506,7 +508,7 @@ public class QueryRunnerTestHelper
public static IntervalChunkingQueryRunnerDecorator sameThreadIntervalChunkingQueryRunnerDecorator()
{
return new IntervalChunkingQueryRunnerDecorator(
MoreExecutors.sameThreadExecutor(),
Execs.directExecutor(),
QueryRunnerTestHelper.NOOP_QUERYWATCHER,
new ServiceEmitter("dummy", "dummy", new NoopEmitter())
);

View File

@ -23,7 +23,6 @@ import com.google.common.base.Function;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.io.Closeables;
import com.google.common.util.concurrent.MoreExecutors;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.impl.DimensionsSpec;
@ -32,6 +31,7 @@ import org.apache.druid.data.input.impl.TimeAndDimsParseSpec;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.guava.FunctionalIterable;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
@ -106,7 +106,7 @@ public class SchemaEvolutionTest
final Sequence<T> results = new FinalizeResultsQueryRunner<>(
factory.getToolchest().mergeResults(
factory.mergeRunners(
MoreExecutors.sameThreadExecutor(),
Execs.directExecutor(),
FunctionalIterable
.create(indexes)
.transform(

View File

@ -32,7 +32,6 @@ import com.google.common.base.Suppliers;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import com.google.common.io.Closeables;
import com.google.common.util.concurrent.MoreExecutors;
import org.apache.commons.io.IOUtils;
import org.apache.commons.io.LineIterator;
import org.apache.druid.collections.CloseableStupidPool;
@ -42,6 +41,7 @@ import org.apache.druid.data.input.impl.InputRowParser;
import org.apache.druid.data.input.impl.StringInputRowParser;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.guava.CloseQuietly;
import org.apache.druid.java.util.common.guava.Sequence;
@ -607,7 +607,7 @@ public class AggregationTestHelper implements Closeable
toolChest.mergeResults(
toolChest.preMergeQueryDecoration(
factory.mergeRunners(
MoreExecutors.sameThreadExecutor(),
Execs.directExecutor(),
Lists.transform(
segments,
new Function<Segment, QueryRunner>()

View File

@ -25,11 +25,11 @@ import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.MoreExecutors;
import org.apache.druid.collections.CloseableDefaultBlockingPool;
import org.apache.druid.collections.CloseableStupidPool;
import org.apache.druid.collections.ReferenceCountingResourceHolder;
import org.apache.druid.data.input.Row;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.query.DruidProcessingConfig;
import org.apache.druid.query.QueryContexts;
@ -222,7 +222,7 @@ public class GroupByQueryMergeBufferTest
public GroupByQueryMergeBufferTest(QueryRunner<Row> runner)
{
this.runner = factory.mergeRunners(MoreExecutors.sameThreadExecutor(), ImmutableList.of(runner));
this.runner = factory.mergeRunners(Execs.directExecutor(), ImmutableList.of(runner));
}
@Before

View File

@ -24,11 +24,11 @@ import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.MoreExecutors;
import org.apache.druid.collections.CloseableDefaultBlockingPool;
import org.apache.druid.collections.CloseableStupidPool;
import org.apache.druid.collections.ReferenceCountingResourceHolder;
import org.apache.druid.data.input.Row;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.query.DruidProcessingConfig;
import org.apache.druid.query.InsufficientResourcesException;
@ -183,7 +183,7 @@ public class GroupByQueryRunnerFailureTest
public GroupByQueryRunnerFailureTest(QueryRunner<Row> runner)
{
this.runner = factory.mergeRunners(MoreExecutors.sameThreadExecutor(), ImmutableList.of(runner));
this.runner = factory.mergeRunners(Execs.directExecutor(), ImmutableList.of(runner));
}
@Test(timeout = 60_000L)

View File

@ -22,12 +22,12 @@ package org.apache.druid.query.groupby;
import com.google.common.base.Function;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.MoreExecutors;
import org.apache.druid.data.input.MapBasedRow;
import org.apache.druid.data.input.Row;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
@ -55,6 +55,7 @@ import java.io.IOException;
import java.util.Map;
/**
*
*/
@RunWith(Parameterized.class)
public class GroupByTimeseriesQueryRunnerTest extends TimeseriesQueryRunnerTest
@ -93,7 +94,7 @@ public class GroupByTimeseriesQueryRunnerTest extends TimeseriesQueryRunnerTest
{
TimeseriesQuery tsQuery = (TimeseriesQuery) queryPlus.getQuery();
QueryRunner<Row> newRunner = factory.mergeRunners(
MoreExecutors.sameThreadExecutor(), ImmutableList.of(input)
Execs.directExecutor(), ImmutableList.of(input)
);
QueryToolChest toolChest = factory.getToolchest();

View File

@ -24,10 +24,10 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.MoreExecutors;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.query.BySegmentResultValue;
import org.apache.druid.query.BySegmentResultValueClass;
@ -304,7 +304,7 @@ public class SegmentMetadataQueryTest
QueryRunner myRunner = new FinalizeResultsQueryRunner<>(
toolChest.mergeResults(
FACTORY.mergeRunners(
MoreExecutors.sameThreadExecutor(),
Execs.directExecutor(),
Lists.newArrayList(
toolChest.preMergeQueryDecoration(runner1),
toolChest.preMergeQueryDecoration(runner2)
@ -372,7 +372,7 @@ public class SegmentMetadataQueryTest
QueryRunner myRunner = new FinalizeResultsQueryRunner<>(
toolChest.mergeResults(
FACTORY.mergeRunners(
MoreExecutors.sameThreadExecutor(),
Execs.directExecutor(),
Lists.newArrayList(
toolChest.preMergeQueryDecoration(runner1),
toolChest.preMergeQueryDecoration(runner2)
@ -440,7 +440,7 @@ public class SegmentMetadataQueryTest
QueryRunner myRunner = new FinalizeResultsQueryRunner<>(
toolChest.mergeResults(
FACTORY.mergeRunners(
MoreExecutors.sameThreadExecutor(),
Execs.directExecutor(),
Lists.newArrayList(
toolChest.preMergeQueryDecoration(runner1),
toolChest.preMergeQueryDecoration(runner2)
@ -557,7 +557,7 @@ public class SegmentMetadataQueryTest
QueryRunner myRunner = new FinalizeResultsQueryRunner<>(
toolChest.mergeResults(
FACTORY.mergeRunners(
MoreExecutors.sameThreadExecutor(),
Execs.directExecutor(),
Lists.newArrayList(
toolChest.preMergeQueryDecoration(runner1),
toolChest.preMergeQueryDecoration(runner2)
@ -609,7 +609,7 @@ public class SegmentMetadataQueryTest
QueryRunner myRunner = new FinalizeResultsQueryRunner<>(
toolChest.mergeResults(
FACTORY.mergeRunners(
MoreExecutors.sameThreadExecutor(),
Execs.directExecutor(),
Lists.newArrayList(
toolChest.preMergeQueryDecoration(runner1),
toolChest.preMergeQueryDecoration(runner2)
@ -671,7 +671,7 @@ public class SegmentMetadataQueryTest
QueryRunner myRunner = new FinalizeResultsQueryRunner<>(
toolChest.mergeResults(
FACTORY.mergeRunners(
MoreExecutors.sameThreadExecutor(),
Execs.directExecutor(),
Lists.newArrayList(
toolChest.preMergeQueryDecoration(runner1),
toolChest.preMergeQueryDecoration(runner2)
@ -729,7 +729,7 @@ public class SegmentMetadataQueryTest
QueryRunner myRunner = new FinalizeResultsQueryRunner<>(
toolChest.mergeResults(
FACTORY.mergeRunners(
MoreExecutors.sameThreadExecutor(),
Execs.directExecutor(),
Lists.newArrayList(
toolChest.preMergeQueryDecoration(runner1),
toolChest.preMergeQueryDecoration(runner2)
@ -787,7 +787,7 @@ public class SegmentMetadataQueryTest
QueryRunner myRunner = new FinalizeResultsQueryRunner<>(
toolChest.mergeResults(
FACTORY.mergeRunners(
MoreExecutors.sameThreadExecutor(),
Execs.directExecutor(),
Lists.newArrayList(
toolChest.preMergeQueryDecoration(runner1),
toolChest.preMergeQueryDecoration(runner2)
@ -832,7 +832,7 @@ public class SegmentMetadataQueryTest
QueryRunner myRunner = new FinalizeResultsQueryRunner<>(
toolChest.mergeResults(
FACTORY.mergeRunners(
MoreExecutors.sameThreadExecutor(),
Execs.directExecutor(),
//Note: It is essential to have atleast 2 query runners merged to reproduce the regression bug described in
//https://github.com/apache/incubator-druid/pull/1172
//the bug surfaces only when ordering is used which happens only when you have 2 things to compare

View File

@ -22,10 +22,10 @@ package org.apache.druid.query.scan;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.io.CharSource;
import com.google.common.util.concurrent.MoreExecutors;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.guava.MergeSequence;
import org.apache.druid.java.util.common.guava.Sequence;
@ -60,6 +60,7 @@ import java.util.List;
import java.util.Map;
/**
*
*/
@RunWith(Parameterized.class)
public class MultiSegmentScanQueryTest
@ -167,7 +168,10 @@ public class MultiSegmentScanQueryTest
@Parameterized.Parameters(name = "limit={0},batchSize={1}")
public static Iterable<Object[]> constructorFeeder()
{
return QueryRunnerTestHelper.cartesian(Arrays.asList(0, 1, 3, 7, 10, 20, 1000), Arrays.asList(0, 1, 3, 6, 7, 10, 123, 2000));
return QueryRunnerTestHelper.cartesian(
Arrays.asList(0, 1, 3, 7, 10, 20, 1000),
Arrays.asList(0, 1, 3, 6, 7, 10, 123, 2000)
);
}
private final int limit;
@ -196,8 +200,9 @@ public class MultiSegmentScanQueryTest
ScanQuery query = newBuilder().build();
List<ScanResultValue> results = factory
.mergeRunners(
MoreExecutors.sameThreadExecutor(),
ImmutableList.of(factory.createRunner(segment0), factory.createRunner(segment1)))
Execs.directExecutor(),
ImmutableList.of(factory.createRunner(segment0), factory.createRunner(segment1))
)
.run(QueryPlus.wrap(query), new HashMap<>())
.toList();
int totalCount = 0;
@ -215,7 +220,8 @@ public class MultiSegmentScanQueryTest
public void testMergeResultsWithLimit()
{
QueryRunner<ScanResultValue> runner = toolChest.mergeResults(
new QueryRunner<ScanResultValue>() {
new QueryRunner<ScanResultValue>()
{
@Override
public Sequence<ScanResultValue> run(
QueryPlus<ScanResultValue> queryPlus,

View File

@ -21,10 +21,10 @@ package org.apache.druid.query.spec;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.MoreExecutors;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.guava.Accumulator;
import org.apache.druid.java.util.common.guava.Sequence;
@ -116,6 +116,7 @@ public class SpecificSegmentQueryRunnerTest
new YieldingAccumulator()
{
final List lists = new ArrayList<>();
@Override
public Object accumulate(Object accumulated, Object in)
{
@ -162,7 +163,7 @@ public class SpecificSegmentQueryRunnerTest
throw new SegmentMissingException("FAILSAUCE");
}
},
MoreExecutors.sameThreadExecutor()
Execs.directExecutor()
);
}
},

View File

@ -27,6 +27,7 @@ import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.java.util.common.logger.Logger;
@ -101,7 +102,7 @@ public class BackgroundCachePopulator implements CachePopulator
exec
);
},
MoreExecutors.sameThreadExecutor()
Execs.directExecutor()
);
}

View File

@ -26,7 +26,6 @@ import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.base.Throwables;
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.inject.Inject;
import org.apache.druid.data.input.Committer;
import org.apache.druid.data.input.Firehose;
@ -65,6 +64,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
/**
*
*/
public class RealtimeManager implements QuerySegmentWalker
{
@ -175,7 +175,7 @@ public class RealtimeManager implements QuerySegmentWalker
return partitionChiefs == null ? new NoopQueryRunner<T>() : factory.getToolchest().mergeResults(
factory.mergeRunners(
MoreExecutors.sameThreadExecutor(),
Execs.directExecutor(),
// Chaining query runners which wait on submitted chain query runners can make executor pools deadlock
Iterables.transform(
partitionChiefs.values(), new Function<FireChief, QueryRunner<T>>()
@ -202,7 +202,7 @@ public class RealtimeManager implements QuerySegmentWalker
? new NoopQueryRunner<T>()
: factory.getToolchest().mergeResults(
factory.mergeRunners(
MoreExecutors.sameThreadExecutor(),
Execs.directExecutor(),
Iterables.transform(
specs,
new Function<SegmentDescriptor, QueryRunner<T>>()

View File

@ -23,7 +23,6 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.MoreExecutors;
import org.apache.druid.client.CachingQueryRunner;
import org.apache.druid.client.cache.Cache;
import org.apache.druid.client.cache.CacheConfig;
@ -31,6 +30,7 @@ import org.apache.druid.client.cache.CachePopulatorStats;
import org.apache.druid.client.cache.ForegroundCachePopulator;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.guava.CloseQuietly;
import org.apache.druid.java.util.common.guava.FunctionalIterable;
import org.apache.druid.java.util.emitter.EmittingLogger;
@ -206,7 +206,7 @@ public class SinkQuerySegmentWalker implements QuerySegmentWalker
sinkSegmentIdentifier,
descriptor.getInterval().getStart(),
factory.mergeRunners(
MoreExecutors.sameThreadExecutor(),
Execs.directExecutor(),
Iterables.transform(
theSink,
new Function<FireHydrant, QueryRunner<T>>()

View File

@ -26,12 +26,12 @@ import com.google.common.base.Supplier;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import org.apache.druid.data.input.Committer;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.concurrent.ListenableFutures;
import org.apache.druid.java.util.common.guava.Comparators;
import org.apache.druid.java.util.common.logger.Logger;
@ -209,7 +209,7 @@ public class StreamAppenderatorDriver extends BaseAppenderatorDriver
/**
* Persist all data indexed through this driver so far. Blocks until complete.
*
* <p>
* Should be called after all data has been added through {@link #add(InputRow, String, Supplier, boolean, boolean)}.
*
* @param committer committer representing all data that has been added so far
@ -235,7 +235,7 @@ public class StreamAppenderatorDriver extends BaseAppenderatorDriver
/**
* Persist all data indexed through this driver so far. Returns a future of persisted commitMetadata.
*
* <p>
* Should be called after all data has been added through {@link #add(InputRow, String, Supplier, boolean, boolean)}.
*
* @param committer committer representing all data that has been added so far
@ -331,7 +331,7 @@ public class StreamAppenderatorDriver extends BaseAppenderatorDriver
segmentIdentifier.getVersion(),
segmentIdentifier.getShardSpec().getPartitionNum()
),
MoreExecutors.sameThreadExecutor(),
Execs.directExecutor(),
() -> {
log.info("Segment[%s] successfully handed off, dropping.", segmentIdentifier);
metrics.incrementHandOffCount();

View File

@ -23,7 +23,7 @@ import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimaps;
import com.google.common.collect.SetMultimap;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryWatcher;
@ -76,7 +76,7 @@ public class QueryManager implements QueryWatcher
}
}
},
MoreExecutors.sameThreadExecutor()
Execs.directExecutor()
);
}

View File

@ -62,6 +62,7 @@ import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.granularity.PeriodGranularity;
@ -165,6 +166,7 @@ import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.Executor;
/**
*
*/
@RunWith(Parameterized.class)
public class CachingClusteredClientTest
@ -331,7 +333,7 @@ public class CachingClusteredClientTest
final ListeningExecutorService delegate = MoreExecutors.listeningDecorator(
// we need to run everything in the same thread to ensure all callbacks on futures in CachingClusteredClient
// are complete before moving on to the next query run.
MoreExecutors.sameThreadExecutor()
Execs.directExecutor()
);
@Override
@ -1650,9 +1652,11 @@ public class CachingClusteredClientTest
makeTimeResults(DateTimes.of("2011-01-01"), 50, 5000,
DateTimes.of("2011-01-02"), 10, 1252,
DateTimes.of("2011-01-03"), 20, 6213,
DateTimes.of("2011-01-04"), 30, 743),
DateTimes.of("2011-01-04"), 30, 743
),
makeTimeResults(DateTimes.of("2011-01-07"), 60, 6020,
DateTimes.of("2011-01-08"), 70, 250)
DateTimes.of("2011-01-08"), 70, 250
)
);
testQueryCachingWithFilter(
@ -1888,7 +1892,12 @@ public class CachingClusteredClientTest
@Override
public Sequence answer()
{
return toFilteredQueryableTimeseriesResults((TimeseriesQuery) capture.getValue().getQuery(), segmentIds, queryIntervals, results);
return toFilteredQueryableTimeseriesResults(
(TimeseriesQuery) capture.getValue().getQuery(),
segmentIds,
queryIntervals,
results
);
}
})
.times(0, 1);
@ -1946,7 +1955,11 @@ public class CachingClusteredClientTest
MultipleSpecificSegmentSpec spec = (MultipleSpecificSegmentSpec) query.getQuerySegmentSpec();
List<Result<TimeseriesResultValue>> ret = new ArrayList<>();
for (SegmentDescriptor descriptor : spec.getDescriptors()) {
String id = StringUtils.format("%s_%s", queryIntervals.indexOf(descriptor.getInterval()), descriptor.getPartitionNumber());
String id = StringUtils.format(
"%s_%s",
queryIntervals.indexOf(descriptor.getInterval()),
descriptor.getPartitionNumber()
);
int index = segmentIds.indexOf(id);
if (index != -1) {
ret.add(new Result(
@ -2722,7 +2735,8 @@ public class CachingClusteredClientTest
return mergeLimit;
}
},
new DruidHttpClientConfig() {
new DruidHttpClientConfig()
{
@Override
public long getMaxQueuedBytes()
{
@ -3126,7 +3140,8 @@ public class CachingClusteredClientTest
@SuppressWarnings("unchecked")
private QueryRunner getDefaultQueryRunner()
{
return new QueryRunner() {
return new QueryRunner()
{
@Override
public Sequence run(final QueryPlus queryPlus, final Map responseContext)
{

View File

@ -24,7 +24,6 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import org.apache.druid.discovery.DataNodeService;
import org.apache.druid.discovery.DiscoveryDruidNode;
import org.apache.druid.discovery.DruidNodeDiscovery;
@ -32,6 +31,7 @@ import org.apache.druid.discovery.DruidNodeDiscoveryProvider;
import org.apache.druid.discovery.NodeType;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.RE;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.http.client.HttpClient;
import org.apache.druid.java.util.http.client.Request;
import org.apache.druid.java.util.http.client.response.HttpResponseHandler;
@ -64,6 +64,7 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
/**
*
*/
public class HttpServerInventoryViewTest
{
@ -197,7 +198,7 @@ public class HttpServerInventoryViewTest
);
httpServerInventoryView.registerSegmentCallback(
MoreExecutors.sameThreadExecutor(),
Execs.directExecutor(),
new ServerView.SegmentCallback()
{
@Override
@ -225,7 +226,7 @@ public class HttpServerInventoryViewTest
final CountDownLatch serverRemovedCalled = new CountDownLatch(1);
httpServerInventoryView.registerServerRemovedCallback(
MoreExecutors.sameThreadExecutor(),
Execs.directExecutor(),
new ServerView.ServerRemovedCallback()
{
@Override
@ -254,8 +255,10 @@ public class HttpServerInventoryViewTest
segmentDropLatches.get(segment2.getIdentifier()).await();
DruidServer druidServer = httpServerInventoryView.getInventoryValue("host:8080");
Assert.assertEquals(ImmutableMap.of(segment3.getIdentifier(), segment3, segment4.getIdentifier(), segment4),
druidServer.getSegments());
Assert.assertEquals(
ImmutableMap.of(segment3.getIdentifier(), segment3, segment4.getIdentifier(), segment4),
druidServer.getSegments()
);
druidNodeDiscovery.listener.nodesRemoved(ImmutableList.of(druidNode));
@ -320,7 +323,10 @@ public class HttpServerInventoryViewTest
if (requestNum.get() == 2) {
//fail scenario where request is sent to server but we got an unexpected response.
HttpResponse httpResponse = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.INTERNAL_SERVER_ERROR);
HttpResponse httpResponse = new DefaultHttpResponse(
HttpVersion.HTTP_1_1,
HttpResponseStatus.INTERNAL_SERVER_ERROR
);
httpResponse.setContent(ChannelBuffers.buffer(0));
httpResponseHandler.handleResponse(httpResponse, null);
return Futures.immediateFailedFuture(new RuntimeException("server error"));

View File

@ -43,6 +43,7 @@ import org.apache.druid.curator.announcement.Announcer;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.guava.Comparators;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.server.coordination.BatchDataSegmentAnnouncer;
@ -77,6 +78,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
/**
*
*/
public class BatchServerInventoryViewTest
{
@ -118,7 +120,7 @@ public class BatchServerInventoryViewTest
announcer = new Announcer(
cf,
MoreExecutors.sameThreadExecutor()
Execs.directExecutor()
);
announcer.start();
@ -204,7 +206,8 @@ public class BatchServerInventoryViewTest
return input.rhs.getInterval().getStart().isBefore(SEGMENT_INTERVAL_START.plusDays(INITIAL_SEGMENTS));
}
}
) {
)
{
@Override
protected DruidServer addInnerInventory(DruidServer container, String inventoryKey, Set<DataSegment> inventory)
{
@ -337,7 +340,7 @@ public class BatchServerInventoryViewTest
EasyMock.replay(callback);
filteredBatchServerInventoryView.registerSegmentCallback(
MoreExecutors.sameThreadExecutor(),
Execs.directExecutor(),
callback,
new Predicate<Pair<DruidServerMetadata, DataSegment>>()
{
@ -407,7 +410,11 @@ public class BatchServerInventoryViewTest
while (inventoryUpdateCounter.get() != count) {
Thread.sleep(100);
if (stopwatch.elapsed(TimeUnit.MILLISECONDS) > forWaitingTiming.milliseconds()) {
throw new ISE("BatchServerInventoryView is not updating counter expected[%d] value[%d]", count, inventoryUpdateCounter.get());
throw new ISE(
"BatchServerInventoryView is not updating counter expected[%d] value[%d]",
count,
inventoryUpdateCounter.get()
);
}
}
}

View File

@ -23,13 +23,13 @@ import com.fasterxml.jackson.databind.InjectableValues;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.MoreExecutors;
import org.apache.druid.curator.CuratorTestBase;
import org.apache.druid.curator.announcement.Announcer;
import org.apache.druid.discovery.DiscoveryDruidNode;
import org.apache.druid.discovery.DruidNodeDiscovery;
import org.apache.druid.discovery.NodeType;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.initialization.ServerConfig;
import org.apache.druid.server.initialization.ZkPathsConfig;
@ -42,6 +42,7 @@ import java.util.HashSet;
import java.util.Set;
/**
*
*/
public class CuratorDruidNodeAnnouncerAndDiscoveryTest extends CuratorTestBase
{
@ -68,7 +69,7 @@ public class CuratorDruidNodeAnnouncerAndDiscoveryTest extends CuratorTestBase
Announcer announcer = new Announcer(
curator,
MoreExecutors.sameThreadExecutor()
Execs.directExecutor()
);
announcer.start();

View File

@ -27,7 +27,6 @@ import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.MoreExecutors;
import org.apache.druid.data.input.Committer;
import org.apache.druid.data.input.Firehose;
import org.apache.druid.data.input.FirehoseFactory;
@ -41,6 +40,7 @@ import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.parsers.ParseException;
@ -100,6 +100,7 @@ import java.util.Map;
import java.util.concurrent.TimeUnit;
/**
*
*/
public class RealtimeManagerTest
{
@ -1018,7 +1019,8 @@ public class RealtimeManagerTest
}
@Override
public IncrementalIndexAddResult add(InputRow row, Supplier<Committer> committerSupplier) throws IndexSizeExceededException
public IncrementalIndexAddResult add(InputRow row, Supplier<Committer> committerSupplier)
throws IndexSizeExceededException
{
if (row == null) {
return Plumber.THROWAWAY;
@ -1055,7 +1057,7 @@ public class RealtimeManagerTest
return factory.getToolchest()
.mergeResults(
factory.mergeRunners(
MoreExecutors.sameThreadExecutor(),
Execs.directExecutor(),
Iterables.transform(
baseQuery.getIntervals(),
new Function<Interval, QueryRunner<T>>()

View File

@ -19,9 +19,9 @@
package org.apache.druid.segment.realtime.plumber;
import com.google.common.util.concurrent.MoreExecutors;
import org.apache.druid.client.coordinator.CoordinatorClient;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.query.SegmentDescriptor;
import org.easymock.EasyMock;
import org.joda.time.Duration;
@ -62,7 +62,7 @@ public class CoordinatorBasedSegmentHandoffNotifierTest
final AtomicBoolean callbackCalled = new AtomicBoolean(false);
notifier.registerSegmentHandoffCallback(
descriptor,
MoreExecutors.sameThreadExecutor(),
Execs.directExecutor(),
() -> callbackCalled.set(true)
);
notifier.checkForSegmentHandoffs();
@ -93,7 +93,7 @@ public class CoordinatorBasedSegmentHandoffNotifierTest
notifier.registerSegmentHandoffCallback(
descriptor,
MoreExecutors.sameThreadExecutor(),
Execs.directExecutor(),
() -> callbackCalled.set(true)
);
Assert.assertEquals(1, notifier.getHandOffCallbacks().size());

View File

@ -24,7 +24,6 @@ import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.io.Files;
import com.google.common.util.concurrent.MoreExecutors;
import org.apache.commons.io.FileUtils;
import org.apache.druid.client.cache.CachePopulatorStats;
import org.apache.druid.client.cache.MapCache;
@ -38,6 +37,7 @@ import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.query.DefaultQueryRunnerFactoryConglomerate;
@ -79,6 +79,7 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
/**
*
*/
@RunWith(Parameterized.class)
public class RealtimePlumberSchoolTest
@ -115,7 +116,10 @@ public class RealtimePlumberSchoolTest
private FireDepartmentMetrics metrics;
private File tmpDir;
public RealtimePlumberSchoolTest(RejectionPolicyFactory rejectionPolicy, SegmentWriteOutMediumFactory segmentWriteOutMediumFactory)
public RealtimePlumberSchoolTest(
RejectionPolicyFactory rejectionPolicy,
SegmentWriteOutMediumFactory segmentWriteOutMediumFactory
)
{
this.rejectionPolicy = rejectionPolicy;
this.segmentWriteOutMediumFactory = segmentWriteOutMediumFactory;
@ -219,7 +223,7 @@ public class RealtimePlumberSchoolTest
announcer,
segmentPublisher,
handoffNotifierFactory,
MoreExecutors.sameThreadExecutor(),
Execs.directExecutor(),
TestHelper.getTestIndexMergerV9(segmentWriteOutMediumFactory),
TestHelper.getTestIndexIO(),
MapCache.create(0),

View File

@ -27,7 +27,6 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.MoreExecutors;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
@ -35,6 +34,7 @@ import org.apache.curator.test.TestingCluster;
import org.apache.druid.curator.PotentiallyGzippedCompressionProvider;
import org.apache.druid.curator.announcement.Announcer;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.server.coordination.BatchDataSegmentAnnouncer;
import org.apache.druid.server.coordination.ChangeRequestHistory;
@ -58,6 +58,7 @@ import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
/**
*
*/
public class BatchDataSegmentAnnouncerTest
{
@ -97,7 +98,7 @@ public class BatchDataSegmentAnnouncerTest
announcer = new Announcer(
cf,
MoreExecutors.sameThreadExecutor()
Execs.directExecutor()
);
announcer.start();

View File

@ -27,7 +27,6 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.inject.Binder;
import com.google.inject.Injector;
import com.google.inject.Key;
@ -48,6 +47,7 @@ import org.apache.druid.guice.annotations.Json;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
@ -485,7 +485,7 @@ public class DumpSegment extends GuiceRunnable
final QueryRunner<T> runner = factory.createRunner(new QueryableIndexSegment("segment", index));
return factory
.getToolchest()
.mergeResults(factory.mergeRunners(MoreExecutors.sameThreadExecutor(), ImmutableList.of(runner)))
.mergeResults(factory.mergeRunners(Execs.directExecutor(), ImmutableList.of(runner)))
.run(QueryPlus.wrap(query), new HashMap<>());
}

View File

@ -25,7 +25,6 @@ import com.google.common.collect.ImmutableMultimap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.inject.Inject;
import org.apache.calcite.schema.Table;
import org.apache.calcite.schema.impl.AbstractSchema;
@ -34,6 +33,7 @@ import org.apache.druid.client.TimelineServerView;
import org.apache.druid.guice.ManageLifecycle;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.concurrent.ScheduledExecutors;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Yielder;
@ -141,7 +141,7 @@ public class DruidSchema extends AbstractSchema
this.escalator = escalator;
serverView.registerTimelineCallback(
MoreExecutors.sameThreadExecutor(),
Execs.directExecutor(),
new TimelineServerView.TimelineCallback()
{
@Override

View File

@ -23,9 +23,9 @@ import com.google.common.base.Function;
import com.google.common.collect.Iterables;
import com.google.common.collect.Ordering;
import com.google.common.io.Closeables;
import com.google.common.util.concurrent.MoreExecutors;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.UOE;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.guava.FunctionalIterable;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.query.FinalizeResultsQueryRunner;
@ -224,7 +224,7 @@ public class SpecificSegmentsQuerySegmentWalker implements QuerySegmentWalker, C
return new FinalizeResultsQueryRunner<>(
toolChest.mergeResults(
factory.mergeRunners(
MoreExecutors.sameThreadExecutor(),
Execs.directExecutor(),
FunctionalIterable
.create(specs)
.transformCat(