Async reads for JDBC (#13196)

Async reads for JDBC:
Prevents JDBC timeouts on long queries by returning empty batches
when a batch fetch takes too long. Uses an async model to run the
result fetch concurrently with JDBC requests.

Fixed race condition in Druid's Avatica server-side handler
Fixed issue with no-user connections
This commit is contained in:
Paul Rogers 2022-10-18 11:40:57 -07:00 committed by GitHub
parent cc10350870
commit b34b4353f4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 663 additions and 331 deletions

View File

@ -43,7 +43,6 @@ import java.util.concurrent.TimeUnit;
*/
public class Execs
{
/**
* Returns an ExecutorService which is terminated and shutdown from the beginning and not able to accept any tasks.
*/

View File

@ -1888,6 +1888,7 @@ The Druid SQL server is configured through the following properties on the Broke
|`druid.sql.avatica.minRowsPerFrame`|Minimum acceptable value for the JDBC client `Statement.setFetchSize` method. The value for this property must greater than 0. If the JDBC client calls `Statement.setFetchSize` with a lesser value, Druid uses `minRowsPerFrame` instead. If `maxRowsPerFrame` is less than `minRowsPerFrame`, Druid uses the minimum value of the two. For handling queries which produce results with a large number of rows, you can increase this value to reduce the number of fetches required to completely transfer the result set.|100|
|`druid.sql.avatica.maxStatementsPerConnection`|Maximum number of simultaneous open statements per Avatica client connection.|4|
|`druid.sql.avatica.connectionIdleTimeout`|Avatica client connection idle timeout.|PT5M|
|`druid.sql.avatica.fetchTimeoutMs`|Avatica fetch timeout, in milliseconds. When a request for the next batch of data takes longer than this time, Druid returns an empty result set, causing the client to poll again. This avoids HTTP timeouts for long-running queries. The default of 5 sec. is good for most cases. |5000|
|`druid.sql.http.enable`|Whether to enable JSON over HTTP querying at `/druid/v2/sql/`.|true|
|`druid.sql.planner.maxTopNLimit`|Maximum threshold for a [TopN query](../querying/topnquery.md). Higher limits will be planned as [GroupBy queries](../querying/groupbyquery.md) instead.|100000|
|`druid.sql.planner.metadataRefreshPeriod`|Throttle for metadata refreshes.|PT1M|

View File

@ -402,7 +402,7 @@ public class AsyncQueryForwardingServlet extends AsyncProxyServlet implements Qu
HttpServletResponse response
) throws ServletException, IOException
{
// Just call the superclass service method. Overriden in tests.
// Just call the superclass service method. Overridden in tests.
super.service(request, response);
}

View File

@ -247,7 +247,11 @@
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>${mockito.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.jdbi</groupId>
<artifactId>jdbi</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

View File

@ -27,6 +27,7 @@ import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.sql.avatica.DruidJdbcResultSet.ResultFetcherFactory;
import org.apache.druid.sql.calcite.planner.Calcites;
import org.apache.druid.sql.calcite.planner.PrepareResult;
@ -56,16 +57,19 @@ public abstract class AbstractDruidJdbcStatement implements Closeable
protected final String connectionId;
protected final int statementId;
protected final ResultFetcherFactory fetcherFactory;
protected Throwable throwable;
protected DruidJdbcResultSet resultSet;
public AbstractDruidJdbcStatement(
final String connectionId,
final int statementId
final int statementId,
final ResultFetcherFactory fetcherFactory
)
{
this.connectionId = Preconditions.checkNotNull(connectionId, "connectionId");
this.statementId = statementId;
this.fetcherFactory = fetcherFactory;
}
protected static Meta.Signature createSignature(

View File

@ -30,6 +30,7 @@ class AvaticaServerConfig
public static Period DEFAULT_CONNECTION_IDLE_TIMEOUT = new Period("PT5M");
public static int DEFAULT_MIN_ROWS_PER_FRAME = 100;
public static int DEFAULT_MAX_ROWS_PER_FRAME = 5000;
public static int DEFAULT_FETCH_TIMEOUT_MS = 5000;
@JsonProperty
public int maxConnections = DEFAULT_MAX_CONNECTIONS;
@ -46,6 +47,17 @@ class AvaticaServerConfig
@JsonProperty
public int maxRowsPerFrame = DEFAULT_MAX_ROWS_PER_FRAME;
/**
* The maximum amount of time to wait per-fetch for the next result set.
* If a query takes longer than this amount of time, then the fetch will
* return 0 rows, without EOF, and the client will automatically try
* another fetch. The result is an async protocol that avoids network
* timeouts for long-running queries, especially those that take a long
* time to deliver a first batch of results.
*/
@JsonProperty
public int fetchTimeoutMs = DEFAULT_FETCH_TIMEOUT_MS;
public int getMaxConnections()
{
return maxConnections;
@ -77,4 +89,9 @@ class AvaticaServerConfig
}
return minRowsPerFrame;
}
public int getFetchTimeoutMs()
{
return fetchTimeoutMs;
}
}

View File

@ -35,8 +35,8 @@ import java.io.IOException;
public class DruidAvaticaJsonHandler extends AvaticaJsonHandler
{
public static final String AVATICA_PATH = "/druid/v2/sql/avatica/";
public static final String AVATICA_PATH_NO_TRAILING_SLASH = "/druid/v2/sql/avatica";
public static final String AVATICA_PATH = AVATICA_PATH_NO_TRAILING_SLASH + "/";
@Inject
public DruidAvaticaJsonHandler(

View File

@ -35,8 +35,8 @@ import java.io.IOException;
public class DruidAvaticaProtobufHandler extends AvaticaProtobufHandler
{
public static final String AVATICA_PATH = "/druid/v2/sql/avatica-protobuf/";
public static final String AVATICA_PATH_NO_TRAILING_SLASH = "/druid/v2/sql/avatica-protobuf";
public static final String AVATICA_PATH = AVATICA_PATH_NO_TRAILING_SLASH + "/";
@Inject
public DruidAvaticaProtobufHandler(

View File

@ -24,8 +24,10 @@ import com.google.common.collect.ImmutableList;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.sql.PreparedStatement;
import org.apache.druid.sql.SqlQueryPlus;
import org.apache.druid.sql.SqlStatementFactory;
import org.apache.druid.sql.avatica.DruidJdbcResultSet.ResultFetcherFactory;
import java.util.Collections;
import java.util.Map;
@ -37,6 +39,11 @@ import java.util.concurrent.atomic.AtomicReference;
/**
* Connection tracking for {@link DruidMeta}. Thread-safe.
* <p>
* Lock is the instance itself. Used here to protect two members, and in
* other code when we must resolve the connection after resolving the statement.
* The lock prevents closing the connection concurrently with an operation on
* a statement for that connection.
*/
public class DruidConnection
{
@ -56,13 +63,12 @@ public class DruidConnection
private final AtomicInteger statementCounter = new AtomicInteger();
private final AtomicReference<Future<?>> timeoutFuture = new AtomicReference<>();
// Typically synchronized by connectionLock, except in one case: the onClose function passed
// Typically synchronized by this instance, except in one case: the onClose function passed
// into DruidStatements contained by the map.
@GuardedBy("connectionLock")
@GuardedBy("this")
private final ConcurrentMap<Integer, AbstractDruidJdbcStatement> statements = new ConcurrentHashMap<>();
private final Object connectionLock = new Object();
@GuardedBy("connectionLock")
@GuardedBy("this")
private boolean open = true;
public DruidConnection(
@ -93,13 +99,14 @@ public class DruidConnection
return userSecret;
}
public DruidJdbcStatement createStatement(
final SqlStatementFactory sqlStatementFactory
public synchronized DruidJdbcStatement createStatement(
final SqlStatementFactory sqlStatementFactory,
final ResultFetcherFactory fetcherFactory
)
{
final int statementId = statementCounter.incrementAndGet();
synchronized (connectionLock) {
synchronized (this) {
if (statements.containsKey(statementId)) {
// Will only happen if statementCounter rolls over before old statements are cleaned up. If this
// ever happens then something fishy is going on, because we shouldn't have billions of statements.
@ -114,7 +121,9 @@ public class DruidConnection
final DruidJdbcStatement statement = new DruidJdbcStatement(
connectionId,
statementId,
sqlStatementFactory
sessionContext,
sqlStatementFactory,
fetcherFactory
);
statements.put(statementId, statement);
@ -123,15 +132,16 @@ public class DruidConnection
}
}
public DruidJdbcPreparedStatement createPreparedStatement(
public synchronized DruidJdbcPreparedStatement createPreparedStatement(
final SqlStatementFactory sqlStatementFactory,
final SqlQueryPlus sqlQueryPlus,
final long maxRowCount
final long maxRowCount,
final ResultFetcherFactory fetcherFactory
)
{
final int statementId = statementCounter.incrementAndGet();
synchronized (connectionLock) {
synchronized (this) {
if (statements.containsKey(statementId)) {
// Will only happen if statementCounter rolls over before old statements are cleaned up. If this
// ever happens then something fishy is going on, because we shouldn't have billions of statements.
@ -143,11 +153,15 @@ public class DruidConnection
}
@SuppressWarnings("GuardedBy")
final PreparedStatement statement = sqlStatementFactory.preparedStatement(
sqlQueryPlus.withContext(sessionContext)
);
final DruidJdbcPreparedStatement jdbcStmt = new DruidJdbcPreparedStatement(
connectionId,
statementId,
sqlStatementFactory.preparedStatement(sqlQueryPlus),
maxRowCount
statement,
maxRowCount,
fetcherFactory
);
statements.put(statementId, jdbcStmt);
@ -156,17 +170,15 @@ public class DruidConnection
}
}
public AbstractDruidJdbcStatement getStatement(final int statementId)
public synchronized AbstractDruidJdbcStatement getStatement(final int statementId)
{
synchronized (connectionLock) {
return statements.get(statementId);
}
}
public void closeStatement(int statementId)
{
AbstractDruidJdbcStatement stmt;
synchronized (connectionLock) {
synchronized (this) {
stmt = statements.remove(statementId);
}
if (stmt != null) {
@ -180,9 +192,8 @@ public class DruidConnection
*
* @return true if closed
*/
public boolean closeIfEmpty()
public synchronized boolean closeIfEmpty()
{
synchronized (connectionLock) {
if (statements.isEmpty()) {
close();
return true;
@ -190,11 +201,9 @@ public class DruidConnection
return false;
}
}
}
public void close()
public synchronized void close()
{
synchronized (connectionLock) {
// Copy statements before iterating because statement.close() modifies it.
for (AbstractDruidJdbcStatement statement : ImmutableList.copyOf(statements.values())) {
try {
@ -208,7 +217,6 @@ public class DruidConnection
LOG.debug("Connection [%s] closed.", connectionId);
open = false;
}
}
public DruidConnection sync(final Future<?> newTimeoutFuture)
{

View File

@ -26,6 +26,7 @@ import org.apache.druid.java.util.common.ISE;
import org.apache.druid.server.security.ForbiddenException;
import org.apache.druid.sql.DirectStatement;
import org.apache.druid.sql.PreparedStatement;
import org.apache.druid.sql.avatica.DruidJdbcResultSet.ResultFetcherFactory;
import org.apache.druid.sql.calcite.planner.PrepareResult;
import java.util.List;
@ -50,10 +51,11 @@ public class DruidJdbcPreparedStatement extends AbstractDruidJdbcStatement
final String connectionId,
final int statementId,
final PreparedStatement stmt,
final long maxRowCount
final long maxRowCount,
final ResultFetcherFactory fetcherFactory
)
{
super(connectionId, statementId);
super(connectionId, statementId, fetcherFactory);
this.sqlStatement = stmt;
this.maxRowCount = maxRowCount;
}
@ -98,7 +100,7 @@ public class DruidJdbcPreparedStatement extends AbstractDruidJdbcStatement
closeResultSet();
try {
DirectStatement directStmt = sqlStatement.execute(parameters);
resultSet = new DruidJdbcResultSet(this, directStmt, maxRowCount);
resultSet = new DruidJdbcResultSet(this, directStmt, maxRowCount, fetcherFactory);
resultSet.execute();
}
// Failure to execute does not close the prepared statement.

View File

@ -22,20 +22,27 @@ package org.apache.druid.sql.avatica;
import com.google.common.base.Preconditions;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import org.apache.calcite.avatica.Meta;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Yielder;
import org.apache.druid.java.util.common.guava.Yielders;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.sql.DirectStatement;
import java.io.Closeable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/**
* Druid's server-side representation of a JDBC result set. At most one
@ -59,6 +66,105 @@ import java.util.concurrent.ExecutorService;
*/
public class DruidJdbcResultSet implements Closeable
{
private static final Logger LOG = new Logger(DruidJdbcResultSet.class);
/**
* Asynchronous result fetcher. JDBC operates via REST, which is subject to
* a timeout if a query takes too long to respond. Fortunately, JDBC uses a
* batched API, and is perfectly happy to get an empty batch. This class
* runs in a separate thread to fetch a batch. If the fetch takes too long,
* the JDBC request thread will time out waiting, will return an empty batch
* to the client, and will remember the fetch for use in the next fetch
* request. The result is that the time it takes to produce results for long
* running queries is decoupled from the HTTP timeout.
*/
public static class ResultFetcher implements Callable<Meta.Frame>
{
private final int limit;
private int batchSize;
private int offset;
private Yielder<Object[]> yielder;
public ResultFetcher(
final int limit,
final Yielder<Object[]> yielder
)
{
this.limit = limit;
this.yielder = yielder;
}
/**
* In an ideal world, the batch size would be a constructor parameter. But, JDBC,
* oddly, allows a different batch size per request. Hence, we set the size using
* this method before each fetch attempt.
*/
public void setBatchSize(int batchSize)
{
this.batchSize = batchSize;
}
/**
* Result is only valid between executions, which turns out to be
* the only time it is called.
*/
public int offset()
{
return offset;
}
/**
* Fetch the next batch up to the batch size or EOF. Return
* the resulting frame. Exceptions are handled by the executor
* framework.
*/
@Override
public Meta.Frame call()
{
Preconditions.checkState(batchSize > 0);
int rowCount = 0;
final int batchLimit = Math.min(limit - offset, batchSize);
final List<Object> rows = new ArrayList<>(batchLimit);
while (!yielder.isDone() && rowCount < batchLimit) {
rows.add(yielder.get());
yielder = yielder.next(null);
rowCount++;
}
final Meta.Frame result = new Meta.Frame(offset, yielder.isDone(), rows);
offset += rowCount;
return result;
}
}
/**
* Creates the result fetcher and holds config. Rather overkill for production,
* but handy for testing.
*/
public static class ResultFetcherFactory
{
final int fetchTimeoutMs;
public ResultFetcherFactory(int fetchTimeoutMs)
{
// To prevent server hammering, the timeout must be at least 1 second.
this.fetchTimeoutMs = Math.max(1000, fetchTimeoutMs);
}
public int fetchTimeoutMs()
{
return fetchTimeoutMs;
}
public ResultFetcher newFetcher(
final int limit,
final Yielder<Object[]> yielder
)
{
return new ResultFetcher(limit, yielder);
}
}
/**
* Query metrics can only be used within a single thread. Because results can
* be paginated into multiple JDBC frames (each frame being processed by a
@ -77,25 +183,46 @@ public class DruidJdbcResultSet implements Closeable
* https://github.com/apache/druid/pull/4288
* https://github.com/apache/druid/pull/4415
*/
private final ExecutorService yielderOpenCloseExecutor;
private final ExecutorService queryExecutor;
private final DirectStatement stmt;
private final long maxRowCount;
private final ResultFetcherFactory fetcherFactory;
private State state = State.NEW;
private Meta.Signature signature;
private Yielder<Object[]> yielder;
private int offset;
/**
* The fetcher which reads batches of rows. Holds onto the yielder for a
* query. Maintains the current read offset.
*/
private ResultFetcher fetcher;
/**
* Future for a fetch that timed out waiting, and should be used again on
* the next fetch request.
*/
private Future<Meta.Frame> fetchFuture;
/**
* Cached version of the read offset in case the caller asks for the offset
* concurrently with a fetch which may update its own offset. This offset
* is that for the last batch that the client fetched: the fetcher itself
* may be moving to a new offset.
*/
private int nextFetchOffset;
public DruidJdbcResultSet(
final AbstractDruidJdbcStatement jdbcStatement,
final DirectStatement stmt,
final long maxRowCount
final long maxRowCount,
final ResultFetcherFactory fetcherFactory
)
{
this.stmt = stmt;
this.maxRowCount = maxRowCount;
this.yielderOpenCloseExecutor = Execs.singleThreaded(
this.fetcherFactory = fetcherFactory;
this.queryExecutor = Execs.singleThreaded(
StringUtils.format(
"JDBCYielderOpenCloseExecutor-connection-%s-statement-%d",
"JDBCQueryExecutor-connection-%s-statement-%d",
StringUtils.encodeForFormat(jdbcStatement.getConnectionId()),
jdbcStatement.getStatementId()
)
@ -107,19 +234,22 @@ public class DruidJdbcResultSet implements Closeable
ensure(State.NEW);
try {
state = State.RUNNING;
final Sequence<Object[]> baseSequence = yielderOpenCloseExecutor.submit(stmt::execute).get().getResults();
// Execute the first step: plan the query and return a sequence to use
// to get values.
final Sequence<Object[]> sequence = queryExecutor.submit(stmt::execute).get().getResults();
// Subsequent fetch steps are done via the async "fetcher".
fetcher = fetcherFactory.newFetcher(
// We can't apply limits greater than Integer.MAX_VALUE, ignore them.
final Sequence<Object[]> retSequence =
maxRowCount >= 0 && maxRowCount <= Integer.MAX_VALUE
? baseSequence.limit((int) maxRowCount)
: baseSequence;
yielder = Yielders.each(retSequence);
maxRowCount >= 0 && maxRowCount <= Integer.MAX_VALUE ? (int) maxRowCount : Integer.MAX_VALUE,
Yielders.each(sequence)
);
signature = AbstractDruidJdbcStatement.createSignature(
stmt.prepareResult(),
stmt.query().sql()
);
LOG.debug("Opened result set [%s]", stmt.sqlQueryId());
}
catch (ExecutionException e) {
throw closeAndPropagateThrowable(e.getCause());
@ -143,34 +273,61 @@ public class DruidJdbcResultSet implements Closeable
public synchronized Meta.Frame nextFrame(final long fetchOffset, final int fetchMaxRowCount)
{
ensure(State.RUNNING, State.DONE);
Preconditions.checkState(fetchOffset == offset, "fetchOffset [%,d] != offset [%,d]", fetchOffset, offset);
if (fetchOffset != nextFetchOffset) {
throw new IAE(
"Druid can only fetch forward. Requested offset of %,d != current offset %,d",
fetchOffset,
nextFetchOffset
);
}
if (state == State.DONE) {
return new Meta.Frame(fetchOffset, true, Collections.emptyList());
LOG.debug("EOF at offset %,d for result set [%s]", fetchOffset, stmt.sqlQueryId());
return new Meta.Frame(fetcher.offset(), true, Collections.emptyList());
}
final Future<Meta.Frame> future;
if (fetchFuture == null) {
// Not waiting on a batch. Request one now.
fetcher.setBatchSize(fetchMaxRowCount);
future = queryExecutor.submit(fetcher);
} else {
// Last batch took too long. Continue waiting for it.
future = fetchFuture;
fetchFuture = null;
}
try {
final List<Object> rows = new ArrayList<>();
while (!yielder.isDone() && (fetchMaxRowCount < 0 || offset < fetchOffset + fetchMaxRowCount)) {
rows.add(yielder.get());
yielder = yielder.next(null);
offset++;
}
if (yielder.isDone()) {
Meta.Frame result = future.get(fetcherFactory.fetchTimeoutMs(), TimeUnit.MILLISECONDS);
LOG.debug("Fetched batch at offset %,d for result set [%s]", fetchOffset, stmt.sqlQueryId());
if (result.done) {
state = State.DONE;
}
return new Meta.Frame(fetchOffset, state == State.DONE, rows);
nextFetchOffset = fetcher.offset;
return result;
}
catch (Throwable t) {
throw closeAndPropagateThrowable(t);
catch (CancellationException | InterruptedException e) {
// Consider this a failure.
throw closeAndPropagateThrowable(e);
}
catch (ExecutionException e) {
// Fetch threw an error. Unwrap it.
throw closeAndPropagateThrowable(e.getCause());
}
catch (TimeoutException e) {
LOG.debug("Timeout of batch at offset %,d for result set [%s]", fetchOffset, stmt.sqlQueryId());
fetchFuture = future;
// Wait timed out. Return 0 rows: the client will try again later.
// We'll wait again on this same fetch next time.
// Note that when the next fetch request comes, it will use the batch
// size set here: any change in size will be ignored for the in-flight batch.
// Changing batch size mid-query is an odd case: it will probably never happen.
return new Meta.Frame(nextFetchOffset, false, Collections.emptyList());
}
}
public synchronized long getCurrentOffset()
{
ensure(State.RUNNING, State.DONE);
return offset;
return fetcher.offset;
}
@GuardedBy("this")
@ -215,14 +372,28 @@ public class DruidJdbcResultSet implements Closeable
if (state == State.CLOSED || state == State.FAILED) {
return;
}
LOG.debug("Closing result set [%s]", stmt.sqlQueryId());
state = State.CLOSED;
try {
if (yielder != null) {
Yielder<Object[]> theYielder = this.yielder;
this.yielder = null;
// If a fetch is in progress, wait for it to complete.
if (fetchFuture != null) {
try {
fetchFuture.cancel(true);
fetchFuture.get();
}
catch (Exception e) {
// Ignore, we're shutting down anyway.
}
finally {
fetchFuture = null;
}
}
if (fetcher != null) {
Yielder<Object[]> theYielder = fetcher.yielder;
fetcher = null;
// Put the close last, so any exceptions it throws are after we did the other cleanup above.
yielderOpenCloseExecutor.submit(
queryExecutor.submit(
() -> {
theYielder.close();
// makes this a Callable instead of Runnable so we don't need to catch exceptions inside the lambda
@ -230,7 +401,7 @@ public class DruidJdbcResultSet implements Closeable
}
).get();
yielderOpenCloseExecutor.shutdownNow();
queryExecutor.shutdownNow();
}
}
catch (RuntimeException e) {

View File

@ -24,6 +24,9 @@ import org.apache.calcite.avatica.Meta;
import org.apache.druid.sql.DirectStatement;
import org.apache.druid.sql.SqlQueryPlus;
import org.apache.druid.sql.SqlStatementFactory;
import org.apache.druid.sql.avatica.DruidJdbcResultSet.ResultFetcherFactory;
import java.util.Map;
/**
* Represents Druid's version of the JDBC {@code Statement} class:
@ -36,22 +39,27 @@ import org.apache.druid.sql.SqlStatementFactory;
public class DruidJdbcStatement extends AbstractDruidJdbcStatement
{
private final SqlStatementFactory lifecycleFactory;
protected final Map<String, Object> queryContext;
public DruidJdbcStatement(
final String connectionId,
final int statementId,
final SqlStatementFactory lifecycleFactory
final Map<String, Object> queryContext,
final SqlStatementFactory lifecycleFactory,
final ResultFetcherFactory fetcherFactory
)
{
super(connectionId, statementId);
super(connectionId, statementId, fetcherFactory);
this.queryContext = queryContext;
this.lifecycleFactory = Preconditions.checkNotNull(lifecycleFactory, "lifecycleFactory");
}
public synchronized void execute(SqlQueryPlus queryPlus, long maxRowCount)
{
closeResultSet();
queryPlus = queryPlus.withContext(queryContext);
DirectStatement stmt = lifecycleFactory.directStatement(queryPlus);
resultSet = new DruidJdbcResultSet(this, stmt, Long.MAX_VALUE);
resultSet = new DruidJdbcResultSet(this, stmt, Long.MAX_VALUE, fetcherFactory);
try {
resultSet.execute();
}

View File

@ -49,6 +49,7 @@ import org.apache.druid.server.security.AuthenticatorMapper;
import org.apache.druid.server.security.ForbiddenException;
import org.apache.druid.sql.SqlQueryPlus;
import org.apache.druid.sql.SqlStatementFactory;
import org.apache.druid.sql.avatica.DruidJdbcResultSet.ResultFetcherFactory;
import org.apache.druid.sql.calcite.planner.Calcites;
import org.joda.time.Interval;
@ -95,7 +96,20 @@ public class DruidMeta extends MetaImpl
*/
public static <T extends Throwable> T logFailure(T error)
{
if (error instanceof NoSuchConnectionException) {
NoSuchConnectionException ex = (NoSuchConnectionException) error;
logFailure(error, "No such connection: %s", ex.getConnectionId());
} else if (error instanceof NoSuchStatementException) {
NoSuchStatementException ex = (NoSuchStatementException) error;
logFailure(
error,
"No such statement: %s, %d",
ex.getStatementHandle().connectionId,
ex.getStatementHandle().id
);
} else {
logFailure(error, error.getMessage());
}
return error;
}
@ -115,6 +129,7 @@ public class DruidMeta extends MetaImpl
private final AvaticaServerConfig config;
private final List<Authenticator> authenticators;
private final ErrorHandler errorHandler;
private final ResultFetcherFactory fetcherFactory;
/**
* Tracks logical connections.
@ -145,7 +160,8 @@ public class DruidMeta extends MetaImpl
.setDaemon(true)
.build()
),
authMapper.getAuthenticatorChain()
authMapper.getAuthenticatorChain(),
new ResultFetcherFactory(config.getFetchTimeoutMs())
);
}
@ -154,7 +170,8 @@ public class DruidMeta extends MetaImpl
final AvaticaServerConfig config,
final ErrorHandler errorHandler,
final ScheduledExecutorService exec,
final List<Authenticator> authenticators
final List<Authenticator> authenticators,
final ResultFetcherFactory fetcherFactory
)
{
super(null);
@ -163,6 +180,7 @@ public class DruidMeta extends MetaImpl
this.errorHandler = errorHandler;
this.exec = exec;
this.authenticators = authenticators;
this.fetcherFactory = fetcherFactory;
}
@Override
@ -188,11 +206,6 @@ public class DruidMeta extends MetaImpl
try {
openDruidConnection(ch.id, secret, contextMap);
}
catch (NoSuchConnectionException e) {
// Avoid sanitizing Avatica specific exceptions so that the Avatica code
// can rely on them to handle issues in a JDBC-specific way.
throw e;
}
catch (Throwable t) {
throw mapException(t);
}
@ -208,9 +221,6 @@ public class DruidMeta extends MetaImpl
druidConnection.close();
}
}
catch (NoSuchConnectionException e) {
throw e;
}
catch (Throwable t) {
throw mapException(t);
}
@ -224,9 +234,6 @@ public class DruidMeta extends MetaImpl
getDruidConnection(ch.id);
return connProps;
}
catch (NoSuchConnectionException e) {
throw e;
}
catch (Throwable t) {
throw mapException(t);
}
@ -241,12 +248,10 @@ public class DruidMeta extends MetaImpl
public StatementHandle createStatement(final ConnectionHandle ch)
{
try {
final DruidJdbcStatement druidStatement = getDruidConnection(ch.id).createStatement(sqlStatementFactory);
final DruidJdbcStatement druidStatement = getDruidConnection(ch.id)
.createStatement(sqlStatementFactory, fetcherFactory);
return new StatementHandle(ch.id, druidStatement.getStatementId(), null);
}
catch (NoSuchConnectionException e) {
throw e;
}
catch (Throwable t) {
throw mapException(t);
}
@ -275,15 +280,13 @@ public class DruidMeta extends MetaImpl
final DruidJdbcPreparedStatement stmt = getDruidConnection(ch.id).createPreparedStatement(
sqlStatementFactory,
sqlReq,
maxRowCount
maxRowCount,
fetcherFactory
);
stmt.prepare();
LOG.debug("Successfully prepared statement [%s] for execution", stmt.getStatementId());
return new StatementHandle(ch.id, stmt.getStatementId(), stmt.getSignature());
}
catch (NoSuchConnectionException e) {
throw e;
}
catch (Throwable t) {
throw mapException(t);
}
@ -315,7 +318,7 @@ public class DruidMeta extends MetaImpl
}
/**
* Prepares and executes a JDBC {@code Statement}
* Prepares and executes a JDBC {@code Statement}.
*/
@Override
public ExecuteResult prepareAndExecute(
@ -324,26 +327,25 @@ public class DruidMeta extends MetaImpl
final long maxRowCount,
final int maxRowsInFirstFrame,
final PrepareCallback callback
) throws NoSuchStatementException
)
{
try {
// Ignore "callback", this class is designed for use with LocalService which doesn't use it.
final DruidJdbcStatement druidStatement = getDruidStatement(statement, DruidJdbcStatement.class);
final DruidConnection druidConnection = getDruidConnection(statement.connectionId);
AuthenticationResult authenticationResult = doAuthenticate(druidConnection);
SqlQueryPlus sqlRequest = SqlQueryPlus.builder(sql)
// This method is called directly from the Avatica server: it does not go
// through the connection first. We must lock the connection here to prevent race conditions.
synchronized (druidConnection) {
final AuthenticationResult authenticationResult = doAuthenticate(druidConnection);
final SqlQueryPlus sqlRequest = SqlQueryPlus.builder(sql)
.auth(authenticationResult)
.context(druidConnection.sessionContext())
.build();
druidStatement.execute(sqlRequest, maxRowCount);
ExecuteResult result = doFetch(druidStatement, maxRowsInFirstFrame);
final ExecuteResult result = doFetch(druidStatement, maxRowsInFirstFrame);
LOG.debug("Successfully prepared statement [%s] and started execution", druidStatement.getStatementId());
return result;
}
// Cannot affect these exceptions as Avatica handles them.
catch (NoSuchConnectionException | NoSuchStatementException e) {
throw e;
}
catch (Throwable t) {
throw mapException(t);
@ -357,6 +359,15 @@ public class DruidMeta extends MetaImpl
*/
private RuntimeException mapException(Throwable t)
{
// Don't sanitize or wrap Avatica exceptions: these exceptions
// are handled specially by Avatica to provide SQLState, Error Code
// and other JDBC-specific items.
if (t instanceof AvaticaRuntimeException) {
throw (AvaticaRuntimeException) t;
}
if (t instanceof NoSuchConnectionException) {
throw (NoSuchConnectionException) t;
}
// BasicSecurityAuthenticationException is not visible here.
String className = t.getClass().getSimpleName();
if (t instanceof ForbiddenException ||
@ -365,7 +376,8 @@ public class DruidMeta extends MetaImpl
t.getMessage(),
ErrorResponse.UNAUTHORIZED_ERROR_CODE,
ErrorResponse.UNAUTHORIZED_SQL_STATE,
AvaticaSeverity.ERROR);
AvaticaSeverity.ERROR
);
}
// Let Avatica do its default mapping.
@ -425,9 +437,6 @@ public class DruidMeta extends MetaImpl
LOG.debug("Fetching next frame from offset %,d with %,d rows for statement [%s]", offset, maxRows, statement.id);
return getDruidStatement(statement, AbstractDruidJdbcStatement.class).nextFrame(offset, maxRows);
}
catch (NoSuchConnectionException e) {
throw e;
}
catch (Throwable t) {
throw mapException(t);
}
@ -450,7 +459,7 @@ public class DruidMeta extends MetaImpl
final StatementHandle statement,
final List<TypedValue> parameterValues,
final int maxRowsInFirstFrame
) throws NoSuchStatementException
)
{
try {
final DruidJdbcPreparedStatement druidStatement =
@ -462,9 +471,6 @@ public class DruidMeta extends MetaImpl
druidStatement.getStatementId());
return result;
}
catch (NoSuchStatementException | NoSuchConnectionException e) {
throw e;
}
catch (Throwable t) {
throw mapException(t);
}
@ -493,9 +499,6 @@ public class DruidMeta extends MetaImpl
druidConnection.closeStatement(h.id);
}
}
catch (NoSuchConnectionException e) {
throw e;
}
catch (Throwable t) {
throw mapException(t);
}
@ -506,7 +509,7 @@ public class DruidMeta extends MetaImpl
final StatementHandle sh,
final QueryState state,
final long offset
) throws NoSuchStatementException
)
{
try {
final AbstractDruidJdbcStatement druidStatement = getDruidStatement(sh, AbstractDruidJdbcStatement.class);
@ -521,9 +524,6 @@ public class DruidMeta extends MetaImpl
}
return !isDone;
}
catch (NoSuchStatementException | NoSuchConnectionException e) {
throw e;
}
catch (Throwable t) {
throw mapException(t);
}
@ -560,9 +560,6 @@ public class DruidMeta extends MetaImpl
return sqlResultSet(ch, sql);
}
catch (NoSuchConnectionException e) {
throw e;
}
catch (Throwable t) {
throw mapException(t);
}
@ -597,9 +594,6 @@ public class DruidMeta extends MetaImpl
return sqlResultSet(ch, sql);
}
catch (NoSuchConnectionException e) {
throw e;
}
catch (Throwable t) {
throw mapException(t);
}
@ -656,9 +650,6 @@ public class DruidMeta extends MetaImpl
return sqlResultSet(ch, sql);
}
catch (NoSuchConnectionException e) {
throw e;
}
catch (Throwable t) {
throw mapException(t);
}
@ -726,9 +717,6 @@ public class DruidMeta extends MetaImpl
return sqlResultSet(ch, sql);
}
catch (NoSuchConnectionException e) {
throw e;
}
catch (Throwable t) {
throw mapException(t);
}
@ -747,9 +735,6 @@ public class DruidMeta extends MetaImpl
return sqlResultSet(ch, sql);
}
catch (NoSuchConnectionException e) {
throw e;
}
catch (Throwable t) {
throw mapException(t);
}

View File

@ -47,6 +47,7 @@ import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.guava.Yielder;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.math.expr.ExprMacroTable;
@ -67,6 +68,9 @@ import org.apache.druid.server.security.AuthTestUtils;
import org.apache.druid.server.security.AuthenticatorMapper;
import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.server.security.Escalator;
import org.apache.druid.sql.SqlStatementFactory;
import org.apache.druid.sql.avatica.DruidJdbcResultSet.ResultFetcher;
import org.apache.druid.sql.avatica.DruidJdbcResultSet.ResultFetcherFactory;
import org.apache.druid.sql.calcite.planner.CalciteRulesManager;
import org.apache.druid.sql.calcite.planner.Calcites;
import org.apache.druid.sql.calcite.planner.DruidOperatorTable;
@ -90,11 +94,12 @@ import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.rules.TemporaryFolder;
import org.skife.jdbi.v2.DBI;
import org.skife.jdbi.v2.Handle;
import org.skife.jdbi.v2.ResultIterator;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.sql.Array;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
@ -117,30 +122,27 @@ import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadLocalRandom;
/**
* Tests the Avatica-based JDBC implementation using JSON serialization. See
* {@link DruidAvaticaProtobufHandlerTest} for a subclass which runs
* this same set of tests using Protobuf serialization.
* To run this in an IDE, set {@code -Duser.timezone=UTC}.
*/
public class DruidAvaticaHandlerTest extends CalciteTestBase
{
private static final AvaticaServerConfig AVATICA_CONFIG = new AvaticaServerConfig()
{
@Override
public int getMaxConnections()
{
private static final int CONNECTION_LIMIT = 4;
private static final int STATEMENT_LIMIT = 4;
private static final AvaticaServerConfig AVATICA_CONFIG;
static {
AVATICA_CONFIG = new AvaticaServerConfig();
// This must match the number of Connection objects created in testTooManyStatements()
return 4;
AVATICA_CONFIG.maxConnections = CONNECTION_LIMIT;
AVATICA_CONFIG.maxStatementsPerConnection = STATEMENT_LIMIT;
}
@Override
public int getMaxStatementsPerConnection()
{
return 4;
}
};
private static final String DUMMY_SQL_QUERY_ID = "dummy";
private static QueryRunnerFactoryConglomerate conglomerate;
@ -162,35 +164,99 @@ public class DruidAvaticaHandlerTest extends CalciteTestBase
resourceCloser.close();
}
@Rule
public ExpectedException expectedException = ExpectedException.none();
@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();
@Rule
public QueryLogHook queryLogHook = QueryLogHook.create();
private final PlannerConfig plannerConfig = new PlannerConfig();
private final DruidOperatorTable operatorTable = CalciteTests.createOperatorTable();
private final ExprMacroTable macroTable = CalciteTests.createExprMacroTable();
private SpecificSegmentsQuerySegmentWalker walker;
private Server server;
private ServerWrapper server;
private Connection client;
private Connection clientNoTrailingSlash;
private Connection superuserClient;
private Connection clientLosAngeles;
private DruidMeta druidMeta;
private String url;
private Injector injector;
private TestRequestLogger testRequestLogger;
private DruidSchemaCatalog makeRootSchema()
{
return CalciteTests.createMockRootSchema(
conglomerate,
walker,
plannerConfig,
CalciteTests.TEST_AUTHORIZER_MAPPER
);
}
private class ServerWrapper
{
final DruidMeta druidMeta;
final Server server;
final String url;
ServerWrapper(final DruidMeta druidMeta) throws Exception
{
this.druidMeta = druidMeta;
server = new Server(0);
server.setHandler(getAvaticaHandler(druidMeta));
server.start();
url = StringUtils.format(
"jdbc:avatica:remote:url=%s%s",
server.getURI().toString(),
StringUtils.maybeRemoveLeadingSlash(getJdbcUrlTail())
);
}
public Connection getConnection(String user, String password) throws SQLException
{
return DriverManager.getConnection(url, user, password);
}
public Connection getUserConnection() throws SQLException
{
return getConnection("regularUser", "druid");
}
// Note: though the URL-only form is OK in general, but it will cause tests
// to crash as the mock auth test code needs the user name.
// Use getUserConnection() instead, or create a URL that includes the
// user name and password.
//public Connection getConnection() throws SQLException
//{
// return DriverManager.getConnection(url);
//}
public void close() throws Exception
{
druidMeta.closeAllConnections();
server.stop();
}
}
protected String getJdbcUrlTail()
{
return DruidAvaticaJsonHandler.AVATICA_PATH;
}
// Default implementation is for JSON to allow debugging of tests.
protected AbstractAvaticaHandler getAvaticaHandler(final DruidMeta druidMeta)
{
return new DruidAvaticaJsonHandler(
druidMeta,
new DruidNode("dummy", "dummy", false, 1, null, true, false),
new AvaticaMonitor()
);
}
@Before
public void setUp() throws Exception
{
walker = CalciteTests.createMockWalker(conglomerate, temporaryFolder.newFolder());
final PlannerConfig plannerConfig = new PlannerConfig();
final DruidOperatorTable operatorTable = CalciteTests.createOperatorTable();
final ExprMacroTable macroTable = CalciteTests.createExprMacroTable();
final DruidSchemaCatalog rootSchema =
CalciteTests.createMockRootSchema(conglomerate, walker, plannerConfig, CalciteTests.TEST_AUTHORIZER_MAPPER);
final DruidSchemaCatalog rootSchema = makeRootSchema();
testRequestLogger = new TestRequestLogger();
injector = new CoreInjectorBuilder(new StartupInjectorBuilder().build())
@ -227,38 +293,35 @@ public class DruidAvaticaHandlerTest extends CalciteTestBase
)
.build();
druidMeta = injector.getInstance(DruidMeta.class);
final AbstractAvaticaHandler handler = this.getAvaticaHandler(druidMeta);
final int port = ThreadLocalRandom.current().nextInt(9999) + 10000;
server = new Server(new InetSocketAddress("127.0.0.1", port));
server.setHandler(handler);
server.start();
url = this.getJdbcConnectionString(port);
client = DriverManager.getConnection(url, "regularUser", "druid");
superuserClient = DriverManager.getConnection(url, CalciteTests.TEST_SUPERUSER_NAME, "druid");
clientNoTrailingSlash = DriverManager.getConnection(StringUtils.maybeRemoveTrailingSlash(url), CalciteTests.TEST_SUPERUSER_NAME, "druid");
DruidMeta druidMeta = injector.getInstance(DruidMeta.class);
server = new ServerWrapper(druidMeta);
client = server.getUserConnection();
superuserClient = server.getConnection(CalciteTests.TEST_SUPERUSER_NAME, "druid");
clientNoTrailingSlash = DriverManager.getConnection(StringUtils.maybeRemoveTrailingSlash(server.url), CalciteTests.TEST_SUPERUSER_NAME, "druid");
final Properties propertiesLosAngeles = new Properties();
propertiesLosAngeles.setProperty("sqlTimeZone", "America/Los_Angeles");
propertiesLosAngeles.setProperty("user", "regularUserLA");
propertiesLosAngeles.setProperty(BaseQuery.SQL_QUERY_ID, DUMMY_SQL_QUERY_ID);
clientLosAngeles = DriverManager.getConnection(url, propertiesLosAngeles);
clientLosAngeles = DriverManager.getConnection(server.url, propertiesLosAngeles);
}
@After
public void tearDown() throws Exception
{
if (server != null) {
client.close();
clientLosAngeles.close();
clientNoTrailingSlash.close();
server.stop();
walker.close();
walker = null;
server.close();
client = null;
clientLosAngeles = null;
clientNoTrailingSlash = null;
server = null;
}
walker.close();
walker = null;
}
@Test
public void testSelectCount() throws SQLException
@ -772,19 +835,21 @@ public class DruidAvaticaHandlerTest extends CalciteTestBase
@Test
public void testTooManyStatements() throws SQLException
{
for (int i = 0; i < 4; i++) {
for (int i = 0; i < STATEMENT_LIMIT; i++) {
client.createStatement();
}
expectedException.expect(AvaticaClientRuntimeException.class);
expectedException.expectMessage("Too many open statements, limit is 4");
client.createStatement();
AvaticaClientRuntimeException ex = Assert.assertThrows(
AvaticaClientRuntimeException.class,
() -> client.createStatement()
);
Assert.assertTrue(ex.getMessage().contains("Too many open statements, limit is 4"));
}
@Test
public void testNotTooManyStatementsWhenYouCloseThem() throws SQLException
{
for (int i = 0; i < 10; i++) {
for (int i = 0; i < STATEMENT_LIMIT * 2; i++) {
client.createStatement().close();
}
}
@ -863,7 +928,7 @@ public class DruidAvaticaHandlerTest extends CalciteTestBase
ImmutableList.of(ImmutableMap.of("cnt", 6L)),
getRows(resultSet)
);
druidMeta.closeAllConnections();
server.druidMeta.closeAllConnections();
}
}
@ -875,57 +940,44 @@ public class DruidAvaticaHandlerTest extends CalciteTestBase
superuserClient.createStatement();
clientNoTrailingSlash.createStatement();
expectedException.expect(AvaticaClientRuntimeException.class);
expectedException.expectMessage("Too many connections");
DriverManager.getConnection(url);
AvaticaClientRuntimeException ex = Assert.assertThrows(
AvaticaClientRuntimeException.class,
() -> server.getUserConnection()
);
Assert.assertTrue(ex.getMessage().contains("Too many connections"));
}
@Test
public void testNotTooManyConnectionsWhenTheyAreEmpty() throws SQLException
public void testNotTooManyConnectionsWhenTheyAreClosed() throws SQLException
{
for (int i = 0; i < 4; i++) {
try (Connection connection = DriverManager.getConnection(url)) {
for (int i = 0; i < CONNECTION_LIMIT * 2; i++) {
try (Connection connection = server.getUserConnection()) {
}
}
}
@Test
public void testMaxRowsPerFrame() throws Exception
public void testConnectionsCloseStatements() throws SQLException
{
final AvaticaServerConfig smallFrameConfig = new AvaticaServerConfig()
{
@Override
public int getMaxConnections()
{
return 2;
for (int i = 0; i < CONNECTION_LIMIT * 2; i++) {
try (Connection connection = server.getUserConnection()) {
// Note: NOT in a try-catch block. Let the connection close the statement
final Statement statement = connection.createStatement();
// Again, NOT in a try-catch block: let the statement close the
// result set.
final ResultSet resultSet = statement.executeQuery("SELECT COUNT(*) AS cnt FROM druid.foo");
Assert.assertTrue(resultSet.next());
}
}
}
@Override
public int getMaxStatementsPerConnection()
private SqlStatementFactory makeStatementFactory()
{
return 4;
}
@Override
public int getMaxRowsPerFrame()
{
return 2;
}
};
final PlannerConfig plannerConfig = new PlannerConfig();
final DruidOperatorTable operatorTable = CalciteTests.createOperatorTable();
final ExprMacroTable macroTable = CalciteTests.createExprMacroTable();
final List<Meta.Frame> frames = new ArrayList<>();
final ScheduledExecutorService exec = Execs.scheduledSingleThreaded("testMaxRowsPerFrame");
DruidSchemaCatalog rootSchema =
CalciteTests.createMockRootSchema(conglomerate, walker, plannerConfig, AuthTestUtils.TEST_AUTHORIZER_MAPPER);
DruidMeta smallFrameDruidMeta = new DruidMeta(
CalciteTests.createSqlStatementFactory(
return CalciteTests.createSqlStatementFactory(
CalciteTests.createMockSqlEngine(walker, conglomerate),
new PlannerFactory(
rootSchema,
makeRootSchema(),
operatorTable,
macroTable,
plannerConfig,
@ -934,11 +986,26 @@ public class DruidAvaticaHandlerTest extends CalciteTestBase
CalciteTests.DRUID_SCHEMA_NAME,
new CalciteRulesManager(ImmutableSet.of())
)
),
smallFrameConfig,
);
}
@Test
public void testMaxRowsPerFrame() throws Exception
{
final AvaticaServerConfig config = new AvaticaServerConfig();
config.maxConnections = 2;
config.maxStatementsPerConnection = STATEMENT_LIMIT;
config.maxRowsPerFrame = 2;
final List<Meta.Frame> frames = new ArrayList<>();
final ScheduledExecutorService exec = Execs.scheduledSingleThreaded("testMaxRowsPerFrame");
DruidMeta smallFrameDruidMeta = new DruidMeta(
makeStatementFactory(),
config,
new ErrorHandler(new ServerConfig()),
exec,
injector.getInstance(AuthenticatorMapper.class).getAuthenticatorChain()
injector.getInstance(AuthenticatorMapper.class).getAuthenticatorChain(),
new ResultFetcherFactory(config.fetchTimeoutMs)
)
{
@Override
@ -955,13 +1022,8 @@ public class DruidAvaticaHandlerTest extends CalciteTestBase
}
};
final AbstractAvaticaHandler handler = this.getAvaticaHandler(smallFrameDruidMeta);
final int port = ThreadLocalRandom.current().nextInt(9999) + 20000;
Server smallFrameServer = new Server(new InetSocketAddress("127.0.0.1", port));
smallFrameServer.setHandler(handler);
smallFrameServer.start();
String smallFrameUrl = this.getJdbcConnectionString(port);
Connection smallFrameClient = DriverManager.getConnection(smallFrameUrl, "regularUser", "druid");
ServerWrapper server = new ServerWrapper(smallFrameDruidMeta);
Connection smallFrameClient = server.getUserConnection();
final ResultSet resultSet = smallFrameClient.createStatement().executeQuery(
"SELECT dim1 FROM druid.foo"
@ -980,59 +1042,29 @@ public class DruidAvaticaHandlerTest extends CalciteTestBase
rows
);
resultSet.close();
smallFrameClient.close();
exec.shutdown();
server.close();
}
@Test
public void testMinRowsPerFrame() throws Exception
{
final int minFetchSize = 1000;
final AvaticaServerConfig smallFrameConfig = new AvaticaServerConfig()
{
@Override
public int getMaxConnections()
{
return 2;
}
final AvaticaServerConfig config = new AvaticaServerConfig();
config.maxConnections = 2;
config.maxStatementsPerConnection = STATEMENT_LIMIT;
config.minRowsPerFrame = 1000;
@Override
public int getMaxStatementsPerConnection()
{
return 4;
}
@Override
public int getMinRowsPerFrame()
{
return minFetchSize;
}
};
final PlannerConfig plannerConfig = new PlannerConfig();
final DruidOperatorTable operatorTable = CalciteTests.createOperatorTable();
final ExprMacroTable macroTable = CalciteTests.createExprMacroTable();
final List<Meta.Frame> frames = new ArrayList<>();
final ScheduledExecutorService exec = Execs.scheduledSingleThreaded("testMaxRowsPerFrame");
DruidSchemaCatalog rootSchema =
CalciteTests.createMockRootSchema(conglomerate, walker, plannerConfig, AuthTestUtils.TEST_AUTHORIZER_MAPPER);
DruidMeta smallFrameDruidMeta = new DruidMeta(
CalciteTests.createSqlStatementFactory(
CalciteTests.createMockSqlEngine(walker, conglomerate),
new PlannerFactory(
rootSchema,
operatorTable,
macroTable,
plannerConfig,
AuthTestUtils.TEST_AUTHORIZER_MAPPER,
CalciteTests.getJsonMapper(),
CalciteTests.DRUID_SCHEMA_NAME,
new CalciteRulesManager(ImmutableSet.of())
)
),
smallFrameConfig,
makeStatementFactory(),
config,
new ErrorHandler(new ServerConfig()),
exec,
injector.getInstance(AuthenticatorMapper.class).getAuthenticatorChain()
injector.getInstance(AuthenticatorMapper.class).getAuthenticatorChain(),
new ResultFetcherFactory(config.fetchTimeoutMs)
)
{
@Override
@ -1043,20 +1075,15 @@ public class DruidAvaticaHandlerTest extends CalciteTestBase
) throws NoSuchStatementException, MissingResultsException
{
// overriding fetch allows us to track how many frames are processed after the first frame, and also fetch size
Assert.assertEquals(minFetchSize, fetchMaxRowCount);
Assert.assertEquals(config.minRowsPerFrame, fetchMaxRowCount);
Frame frame = super.fetch(statement, offset, fetchMaxRowCount);
frames.add(frame);
return frame;
}
};
final AbstractAvaticaHandler handler = this.getAvaticaHandler(smallFrameDruidMeta);
final int port = ThreadLocalRandom.current().nextInt(9999) + 20000;
Server smallFrameServer = new Server(new InetSocketAddress("127.0.0.1", port));
smallFrameServer.setHandler(handler);
smallFrameServer.start();
String smallFrameUrl = this.getJdbcConnectionString(port);
Connection smallFrameClient = DriverManager.getConnection(smallFrameUrl, "regularUser", "druid");
ServerWrapper server = new ServerWrapper(smallFrameDruidMeta);
Connection smallFrameClient = server.getUserConnection();
// use a prepared statement because Avatica currently ignores fetchSize on the initial fetch of a Statement
PreparedStatement statement = smallFrameClient.prepareStatement("SELECT dim1 FROM druid.foo");
@ -1078,7 +1105,10 @@ public class DruidAvaticaHandlerTest extends CalciteTestBase
rows
);
resultSet.close();
smallFrameClient.close();
exec.shutdown();
server.close();
}
@Test
@ -1546,24 +1576,124 @@ public class DruidAvaticaHandlerTest extends CalciteTestBase
Assert.fail("Test failed, did not get SQLException");
}
// Default implementation is for JSON to allow debugging of tests.
protected String getJdbcConnectionString(final int port)
private static class TestResultFetcher extends ResultFetcher
{
return StringUtils.format(
"jdbc:avatica:remote:url=http://127.0.0.1:%d%s",
port,
DruidAvaticaJsonHandler.AVATICA_PATH
);
public TestResultFetcher(int limit, Yielder<Object[]> yielder)
{
super(limit, yielder);
}
// Default implementation is for JSON to allow debugging of tests.
protected AbstractAvaticaHandler getAvaticaHandler(final DruidMeta druidMeta)
@Override
public Meta.Frame call()
{
return new DruidAvaticaJsonHandler(
druidMeta,
new DruidNode("dummy", "dummy", false, 1, null, true, false),
new AvaticaMonitor()
);
try {
if (offset() == 0) {
System.out.println("Taking a nap now...");
Thread.sleep(3000);
}
}
catch (InterruptedException e) {
throw new RuntimeException(e);
}
return super.call();
}
}
/**
* Test the async aspect of the Avatica implementation. The fetch of the
* first batch takes 3 seconds (due to a sleep). However, the client will
* wait only 1 second. So, we should get ~3 empty batches before we get
* the first batch with rows.
*/
@Test
public void testAsync() throws Exception
{
final AvaticaServerConfig config = new AvaticaServerConfig();
config.maxConnections = CONNECTION_LIMIT;
config.maxStatementsPerConnection = STATEMENT_LIMIT;
config.maxRowsPerFrame = 2;
config.fetchTimeoutMs = 1000;
final List<Meta.Frame> frames = new ArrayList<>();
final ScheduledExecutorService exec = Execs.scheduledSingleThreaded("testMaxRowsPerFrame");
DruidMeta druidMeta = new DruidMeta(
makeStatementFactory(),
config,
new ErrorHandler(new ServerConfig()),
exec,
injector.getInstance(AuthenticatorMapper.class).getAuthenticatorChain(),
new ResultFetcherFactory(config.fetchTimeoutMs) {
@Override
public ResultFetcher newFetcher(
final int limit,
final Yielder<Object[]> yielder
)
{
return new TestResultFetcher(limit, yielder);
}
}
)
{
@Override
public Frame fetch(
final StatementHandle statement,
final long offset,
final int fetchMaxRowCount
) throws NoSuchStatementException, MissingResultsException
{
Frame frame = super.fetch(statement, offset, fetchMaxRowCount);
frames.add(frame);
return frame;
}
};
ServerWrapper server = new ServerWrapper(druidMeta);
try (Connection conn = server.getUserConnection()) {
// Test with plain JDBC
try (ResultSet resultSet = conn.createStatement().executeQuery(
"SELECT dim1 FROM druid.foo")) {
List<Map<String, Object>> rows = getRows(resultSet);
Assert.assertEquals(6, rows.size());
Assert.assertTrue(frames.size() > 3);
// There should be at least one empty frame due to timeout
Assert.assertFalse(frames.get(0).rows.iterator().hasNext());
}
}
testWithJDBI(server.url);
exec.shutdown();
server.close();
}
// Test the async feature using DBI, as used internally in Druid.
// Ensures that DBI knows how to handle empty batches (which should,
// in reality, but handled at the JDBC level below DBI.)
private void testWithJDBI(String baseUrl)
{
String url = baseUrl + "?user=regularUser&password=druid" + getJdbcUrlTail();
System.out.println(url);
DBI dbi = new DBI(url);
Handle handle = dbi.open();
try {
ResultIterator<Pair<Long, String>> iter = handle
.createQuery("SELECT __time, dim1 FROM druid.foo")
.map((index, row, ctx) -> new Pair<>(row.getLong(1), row.getString(2)))
.iterator();
int count = 0;
while (iter.hasNext()) {
Pair<Long, String> row = iter.next();
Assert.assertNotNull(row.lhs);
Assert.assertNotNull(row.rhs);
count++;
}
Assert.assertEquals(6, count);
}
finally {
handle.close();
}
}
private static List<Map<String, Object>> getRows(final ResultSet resultSet) throws SQLException

View File

@ -26,11 +26,10 @@ import org.apache.druid.server.DruidNode;
public class DruidAvaticaProtobufHandlerTest extends DruidAvaticaHandlerTest
{
@Override
protected String getJdbcConnectionString(final int port)
protected String getJdbcUrlTail()
{
return StringUtils.format(
"jdbc:avatica:remote:url=http://127.0.0.1:%d%s;serialization=protobuf",
port,
"%s;serialization=protobuf",
DruidAvaticaProtobufHandler.AVATICA_PATH
);
}

View File

@ -35,6 +35,7 @@ import org.apache.druid.server.security.AllowAllAuthenticator;
import org.apache.druid.server.security.AuthTestUtils;
import org.apache.druid.sql.SqlQueryPlus;
import org.apache.druid.sql.SqlStatementFactory;
import org.apache.druid.sql.avatica.DruidJdbcResultSet.ResultFetcherFactory;
import org.apache.druid.sql.calcite.planner.CalciteRulesManager;
import org.apache.druid.sql.calcite.planner.DruidOperatorTable;
import org.apache.druid.sql.calcite.planner.PlannerConfig;
@ -138,7 +139,9 @@ public class DruidStatementTest extends CalciteTestBase
return new DruidJdbcStatement(
"",
0,
sqlStatementFactory
Collections.emptyMap(),
sqlStatementFactory,
new ResultFetcherFactory(AvaticaServerConfig.DEFAULT_FETCH_TIMEOUT_MS)
);
}
@ -517,7 +520,8 @@ public class DruidStatementTest extends CalciteTestBase
"",
0,
sqlStatementFactory.preparedStatement(queryPlus),
Long.MAX_VALUE
Long.MAX_VALUE,
new ResultFetcherFactory(AvaticaServerConfig.DEFAULT_FETCH_TIMEOUT_MS)
);
}

View File

@ -185,19 +185,19 @@ public class CalciteTests
public Authorizer getAuthorizer(String name)
{
return (authenticationResult, resource, action) -> {
if (authenticationResult.getIdentity().equals(TEST_SUPERUSER_NAME)) {
if (TEST_SUPERUSER_NAME.equals(authenticationResult.getIdentity())) {
return Access.OK;
}
switch (resource.getType()) {
case ResourceType.DATASOURCE:
if (resource.getName().equals(FORBIDDEN_DATASOURCE)) {
if (FORBIDDEN_DATASOURCE.equals(resource.getName())) {
return new Access(false);
} else {
return Access.OK;
}
case ResourceType.VIEW:
if (resource.getName().equals("forbiddenView")) {
if ("forbiddenView".equals(resource.getName())) {
return new Access(false);
} else {
return Access.OK;