removed AsyncQueryRunner since was only used by removed interval chunking stuff (#9252)

This commit is contained in:
Clint Wylie 2020-01-27 18:53:17 -08:00 committed by GitHub
parent 36c5efe2ab
commit 14253c63d6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 0 additions and 224 deletions

View File

@ -1,87 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query;
import com.google.common.base.Supplier;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import org.apache.druid.java.util.common.guava.LazySequence;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.query.context.ResponseContext;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class AsyncQueryRunner<T> implements QueryRunner<T>
{
private final QueryRunner<T> baseRunner;
private final ListeningExecutorService executor;
private final QueryWatcher queryWatcher;
public AsyncQueryRunner(QueryRunner<T> baseRunner, ExecutorService executor, QueryWatcher queryWatcher)
{
this.baseRunner = baseRunner;
this.executor = MoreExecutors.listeningDecorator(executor);
this.queryWatcher = queryWatcher;
}
@Override
public Sequence<T> run(final QueryPlus<T> queryPlus, final ResponseContext responseContext)
{
final Query<T> query = queryPlus.getQuery();
final int priority = QueryContexts.getPriority(query);
final QueryPlus<T> threadSafeQueryPlus = queryPlus.withoutThreadUnsafeState();
final ListenableFuture<Sequence<T>> future = executor.submit(
new AbstractPrioritizedCallable<Sequence<T>>(priority)
{
@Override
public Sequence<T> call()
{
//Note: this is assumed that baseRunner does most of the work eagerly on call to the
//run() method and resulting sequence accumulate/yield is fast.
return baseRunner.run(threadSafeQueryPlus, responseContext);
}
}
);
queryWatcher.registerQuery(query, future);
return new LazySequence<>(new Supplier<Sequence<T>>()
{
@Override
public Sequence<T> get()
{
try {
if (QueryContexts.hasTimeout(query)) {
return future.get(QueryContexts.getTimeout(query), TimeUnit.MILLISECONDS);
} else {
return future.get();
}
}
catch (ExecutionException | InterruptedException | TimeoutException ex) {
throw new RuntimeException(ex);
}
}
});
}
}

View File

@ -1,137 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.context.ResponseContext;
import org.easymock.EasyMock;
import org.junit.Assert;
import org.junit.Test;
import java.util.Collections;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeoutException;
public class AsyncQueryRunnerTest
{
private static final long TEST_TIMEOUT_MILLIS = 60_000;
private final ExecutorService executor;
private final Query query;
public AsyncQueryRunnerTest()
{
this.executor = Executors.newSingleThreadExecutor();
query = Druids.newTimeseriesQueryBuilder()
.dataSource("test")
.intervals("2014/2015")
.aggregators(Collections.singletonList(new CountAggregatorFactory("count")))
.build();
}
@Test(timeout = TEST_TIMEOUT_MILLIS)
public void testAsyncNature()
{
final CountDownLatch latch = new CountDownLatch(1);
QueryRunner baseRunner = new QueryRunner()
{
@Override
public Sequence run(QueryPlus queryPlus, ResponseContext responseContext)
{
try {
latch.await();
return Sequences.simple(Collections.singletonList(1));
}
catch (InterruptedException ex) {
throw new RuntimeException(ex);
}
}
};
AsyncQueryRunner asyncRunner = new AsyncQueryRunner<>(
baseRunner,
executor,
QueryRunnerTestHelper.NOOP_QUERYWATCHER
);
Sequence lazy = asyncRunner.run(QueryPlus.wrap(query));
latch.countDown();
Assert.assertEquals(Collections.singletonList(1), lazy.toList());
}
@Test(timeout = TEST_TIMEOUT_MILLIS)
public void testQueryTimeoutHonored()
{
QueryRunner baseRunner = new QueryRunner()
{
@Override
public Sequence run(QueryPlus queryPlus, ResponseContext responseContext)
{
try {
Thread.sleep(Long.MAX_VALUE);
throw new RuntimeException("query should not have completed");
}
catch (InterruptedException ex) {
throw new RuntimeException(ex);
}
}
};
AsyncQueryRunner asyncRunner = new AsyncQueryRunner<>(
baseRunner,
executor,
QueryRunnerTestHelper.NOOP_QUERYWATCHER
);
Sequence lazy =
asyncRunner.run(QueryPlus.wrap(query.withOverriddenContext(ImmutableMap.of(QueryContexts.TIMEOUT_KEY, 1))));
try {
lazy.toList();
}
catch (RuntimeException ex) {
Assert.assertTrue(ex.getCause() instanceof TimeoutException);
return;
}
Assert.fail();
}
@Test
public void testQueryRegistration()
{
QueryRunner baseRunner = (queryPlus, responseContext) -> null;
QueryWatcher mock = EasyMock.createMock(QueryWatcher.class);
mock.registerQuery(EasyMock.eq(query), EasyMock.anyObject(ListenableFuture.class));
EasyMock.replay(mock);
AsyncQueryRunner asyncRunner = new AsyncQueryRunner<>(baseRunner, executor, mock);
asyncRunner.run(QueryPlus.wrap(query));
EasyMock.verify(mock);
}
}