From b34b4353f4a27065b37feac97995d4984334f8ed Mon Sep 17 00:00:00 2001 From: Paul Rogers Date: Tue, 18 Oct 2022 11:40:57 -0700 Subject: [PATCH] Async reads for JDBC (#13196) Async reads for JDBC: Prevents JDBC timeouts on long queries by returning empty batches when a batch fetch takes too long. Uses an async model to run the result fetch concurrently with JDBC requests. Fixed race condition in Druid's Avatica server-side handler Fixed issue with no-user connections --- .../java/util/common/concurrent/Execs.java | 1 - docs/configuration/index.md | 1 + .../server/AsyncQueryForwardingServlet.java | 2 +- sql/pom.xml | 6 +- .../avatica/AbstractDruidJdbcStatement.java | 6 +- .../sql/avatica/AvaticaServerConfig.java | 17 + .../sql/avatica/DruidAvaticaJsonHandler.java | 2 +- .../avatica/DruidAvaticaProtobufHandler.java | 2 +- .../druid/sql/avatica/DruidConnection.java | 86 ++-- .../avatica/DruidJdbcPreparedStatement.java | 8 +- .../druid/sql/avatica/DruidJdbcResultSet.java | 237 +++++++-- .../druid/sql/avatica/DruidJdbcStatement.java | 14 +- .../apache/druid/sql/avatica/DruidMeta.java | 123 ++--- .../sql/avatica/DruidAvaticaHandlerTest.java | 470 +++++++++++------- .../DruidAvaticaProtobufHandlerTest.java | 5 +- .../druid/sql/avatica/DruidStatementTest.java | 8 +- .../druid/sql/calcite/util/CalciteTests.java | 6 +- 17 files changed, 663 insertions(+), 331 deletions(-) diff --git a/core/src/main/java/org/apache/druid/java/util/common/concurrent/Execs.java b/core/src/main/java/org/apache/druid/java/util/common/concurrent/Execs.java index a310e567560..c5bc20f45f2 100644 --- a/core/src/main/java/org/apache/druid/java/util/common/concurrent/Execs.java +++ b/core/src/main/java/org/apache/druid/java/util/common/concurrent/Execs.java @@ -43,7 +43,6 @@ import java.util.concurrent.TimeUnit; */ public class Execs { - /** * Returns an ExecutorService which is terminated and shutdown from the beginning and not able to accept any tasks. */ diff --git a/docs/configuration/index.md b/docs/configuration/index.md index cb826ef348a..6e92076eb7b 100644 --- a/docs/configuration/index.md +++ b/docs/configuration/index.md @@ -1888,6 +1888,7 @@ The Druid SQL server is configured through the following properties on the Broke |`druid.sql.avatica.minRowsPerFrame`|Minimum acceptable value for the JDBC client `Statement.setFetchSize` method. The value for this property must greater than 0. If the JDBC client calls `Statement.setFetchSize` with a lesser value, Druid uses `minRowsPerFrame` instead. If `maxRowsPerFrame` is less than `minRowsPerFrame`, Druid uses the minimum value of the two. For handling queries which produce results with a large number of rows, you can increase this value to reduce the number of fetches required to completely transfer the result set.|100| |`druid.sql.avatica.maxStatementsPerConnection`|Maximum number of simultaneous open statements per Avatica client connection.|4| |`druid.sql.avatica.connectionIdleTimeout`|Avatica client connection idle timeout.|PT5M| +|`druid.sql.avatica.fetchTimeoutMs`|Avatica fetch timeout, in milliseconds. When a request for the next batch of data takes longer than this time, Druid returns an empty result set, causing the client to poll again. This avoids HTTP timeouts for long-running queries. The default of 5 sec. is good for most cases. |5000| |`druid.sql.http.enable`|Whether to enable JSON over HTTP querying at `/druid/v2/sql/`.|true| |`druid.sql.planner.maxTopNLimit`|Maximum threshold for a [TopN query](../querying/topnquery.md). Higher limits will be planned as [GroupBy queries](../querying/groupbyquery.md) instead.|100000| |`druid.sql.planner.metadataRefreshPeriod`|Throttle for metadata refreshes.|PT1M| diff --git a/services/src/main/java/org/apache/druid/server/AsyncQueryForwardingServlet.java b/services/src/main/java/org/apache/druid/server/AsyncQueryForwardingServlet.java index fa3b52669b6..75b13a39f1f 100644 --- a/services/src/main/java/org/apache/druid/server/AsyncQueryForwardingServlet.java +++ b/services/src/main/java/org/apache/druid/server/AsyncQueryForwardingServlet.java @@ -402,7 +402,7 @@ public class AsyncQueryForwardingServlet extends AsyncProxyServlet implements Qu HttpServletResponse response ) throws ServletException, IOException { - // Just call the superclass service method. Overriden in tests. + // Just call the superclass service method. Overridden in tests. super.service(request, response); } diff --git a/sql/pom.xml b/sql/pom.xml index bbf2eb049ed..28f7eecffc4 100644 --- a/sql/pom.xml +++ b/sql/pom.xml @@ -247,9 +247,13 @@ org.mockito mockito-core - ${mockito.version} test + + org.jdbi + jdbi + test + diff --git a/sql/src/main/java/org/apache/druid/sql/avatica/AbstractDruidJdbcStatement.java b/sql/src/main/java/org/apache/druid/sql/avatica/AbstractDruidJdbcStatement.java index 697ad1ca172..1992c1620b1 100644 --- a/sql/src/main/java/org/apache/druid/sql/avatica/AbstractDruidJdbcStatement.java +++ b/sql/src/main/java/org/apache/druid/sql/avatica/AbstractDruidJdbcStatement.java @@ -27,6 +27,7 @@ import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rel.type.RelDataTypeField; import org.apache.calcite.sql.type.SqlTypeName; import org.apache.druid.java.util.common.ISE; +import org.apache.druid.sql.avatica.DruidJdbcResultSet.ResultFetcherFactory; import org.apache.druid.sql.calcite.planner.Calcites; import org.apache.druid.sql.calcite.planner.PrepareResult; @@ -56,16 +57,19 @@ public abstract class AbstractDruidJdbcStatement implements Closeable protected final String connectionId; protected final int statementId; + protected final ResultFetcherFactory fetcherFactory; protected Throwable throwable; protected DruidJdbcResultSet resultSet; public AbstractDruidJdbcStatement( final String connectionId, - final int statementId + final int statementId, + final ResultFetcherFactory fetcherFactory ) { this.connectionId = Preconditions.checkNotNull(connectionId, "connectionId"); this.statementId = statementId; + this.fetcherFactory = fetcherFactory; } protected static Meta.Signature createSignature( diff --git a/sql/src/main/java/org/apache/druid/sql/avatica/AvaticaServerConfig.java b/sql/src/main/java/org/apache/druid/sql/avatica/AvaticaServerConfig.java index e931c3a289e..a5215c19c08 100644 --- a/sql/src/main/java/org/apache/druid/sql/avatica/AvaticaServerConfig.java +++ b/sql/src/main/java/org/apache/druid/sql/avatica/AvaticaServerConfig.java @@ -30,6 +30,7 @@ class AvaticaServerConfig public static Period DEFAULT_CONNECTION_IDLE_TIMEOUT = new Period("PT5M"); public static int DEFAULT_MIN_ROWS_PER_FRAME = 100; public static int DEFAULT_MAX_ROWS_PER_FRAME = 5000; + public static int DEFAULT_FETCH_TIMEOUT_MS = 5000; @JsonProperty public int maxConnections = DEFAULT_MAX_CONNECTIONS; @@ -46,6 +47,17 @@ class AvaticaServerConfig @JsonProperty public int maxRowsPerFrame = DEFAULT_MAX_ROWS_PER_FRAME; + /** + * The maximum amount of time to wait per-fetch for the next result set. + * If a query takes longer than this amount of time, then the fetch will + * return 0 rows, without EOF, and the client will automatically try + * another fetch. The result is an async protocol that avoids network + * timeouts for long-running queries, especially those that take a long + * time to deliver a first batch of results. + */ + @JsonProperty + public int fetchTimeoutMs = DEFAULT_FETCH_TIMEOUT_MS; + public int getMaxConnections() { return maxConnections; @@ -77,4 +89,9 @@ class AvaticaServerConfig } return minRowsPerFrame; } + + public int getFetchTimeoutMs() + { + return fetchTimeoutMs; + } } diff --git a/sql/src/main/java/org/apache/druid/sql/avatica/DruidAvaticaJsonHandler.java b/sql/src/main/java/org/apache/druid/sql/avatica/DruidAvaticaJsonHandler.java index e2d223d1266..4f1a5818bf4 100644 --- a/sql/src/main/java/org/apache/druid/sql/avatica/DruidAvaticaJsonHandler.java +++ b/sql/src/main/java/org/apache/druid/sql/avatica/DruidAvaticaJsonHandler.java @@ -35,8 +35,8 @@ import java.io.IOException; public class DruidAvaticaJsonHandler extends AvaticaJsonHandler { - public static final String AVATICA_PATH = "/druid/v2/sql/avatica/"; public static final String AVATICA_PATH_NO_TRAILING_SLASH = "/druid/v2/sql/avatica"; + public static final String AVATICA_PATH = AVATICA_PATH_NO_TRAILING_SLASH + "/"; @Inject public DruidAvaticaJsonHandler( diff --git a/sql/src/main/java/org/apache/druid/sql/avatica/DruidAvaticaProtobufHandler.java b/sql/src/main/java/org/apache/druid/sql/avatica/DruidAvaticaProtobufHandler.java index 50c54ad27ac..a15efadda6f 100644 --- a/sql/src/main/java/org/apache/druid/sql/avatica/DruidAvaticaProtobufHandler.java +++ b/sql/src/main/java/org/apache/druid/sql/avatica/DruidAvaticaProtobufHandler.java @@ -35,8 +35,8 @@ import java.io.IOException; public class DruidAvaticaProtobufHandler extends AvaticaProtobufHandler { - public static final String AVATICA_PATH = "/druid/v2/sql/avatica-protobuf/"; public static final String AVATICA_PATH_NO_TRAILING_SLASH = "/druid/v2/sql/avatica-protobuf"; + public static final String AVATICA_PATH = AVATICA_PATH_NO_TRAILING_SLASH + "/"; @Inject public DruidAvaticaProtobufHandler( diff --git a/sql/src/main/java/org/apache/druid/sql/avatica/DruidConnection.java b/sql/src/main/java/org/apache/druid/sql/avatica/DruidConnection.java index 23f1a222dd0..5aa3f422b7f 100644 --- a/sql/src/main/java/org/apache/druid/sql/avatica/DruidConnection.java +++ b/sql/src/main/java/org/apache/druid/sql/avatica/DruidConnection.java @@ -24,8 +24,10 @@ import com.google.common.collect.ImmutableList; import com.google.errorprone.annotations.concurrent.GuardedBy; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.sql.PreparedStatement; import org.apache.druid.sql.SqlQueryPlus; import org.apache.druid.sql.SqlStatementFactory; +import org.apache.druid.sql.avatica.DruidJdbcResultSet.ResultFetcherFactory; import java.util.Collections; import java.util.Map; @@ -37,6 +39,11 @@ import java.util.concurrent.atomic.AtomicReference; /** * Connection tracking for {@link DruidMeta}. Thread-safe. + *

+ * Lock is the instance itself. Used here to protect two members, and in + * other code when we must resolve the connection after resolving the statement. + * The lock prevents closing the connection concurrently with an operation on + * a statement for that connection. */ public class DruidConnection { @@ -56,13 +63,12 @@ public class DruidConnection private final AtomicInteger statementCounter = new AtomicInteger(); private final AtomicReference> timeoutFuture = new AtomicReference<>(); - // Typically synchronized by connectionLock, except in one case: the onClose function passed + // Typically synchronized by this instance, except in one case: the onClose function passed // into DruidStatements contained by the map. - @GuardedBy("connectionLock") + @GuardedBy("this") private final ConcurrentMap statements = new ConcurrentHashMap<>(); - private final Object connectionLock = new Object(); - @GuardedBy("connectionLock") + @GuardedBy("this") private boolean open = true; public DruidConnection( @@ -93,13 +99,14 @@ public class DruidConnection return userSecret; } - public DruidJdbcStatement createStatement( - final SqlStatementFactory sqlStatementFactory + public synchronized DruidJdbcStatement createStatement( + final SqlStatementFactory sqlStatementFactory, + final ResultFetcherFactory fetcherFactory ) { final int statementId = statementCounter.incrementAndGet(); - synchronized (connectionLock) { + synchronized (this) { if (statements.containsKey(statementId)) { // Will only happen if statementCounter rolls over before old statements are cleaned up. If this // ever happens then something fishy is going on, because we shouldn't have billions of statements. @@ -114,7 +121,9 @@ public class DruidConnection final DruidJdbcStatement statement = new DruidJdbcStatement( connectionId, statementId, - sqlStatementFactory + sessionContext, + sqlStatementFactory, + fetcherFactory ); statements.put(statementId, statement); @@ -123,15 +132,16 @@ public class DruidConnection } } - public DruidJdbcPreparedStatement createPreparedStatement( + public synchronized DruidJdbcPreparedStatement createPreparedStatement( final SqlStatementFactory sqlStatementFactory, final SqlQueryPlus sqlQueryPlus, - final long maxRowCount + final long maxRowCount, + final ResultFetcherFactory fetcherFactory ) { final int statementId = statementCounter.incrementAndGet(); - synchronized (connectionLock) { + synchronized (this) { if (statements.containsKey(statementId)) { // Will only happen if statementCounter rolls over before old statements are cleaned up. If this // ever happens then something fishy is going on, because we shouldn't have billions of statements. @@ -143,11 +153,15 @@ public class DruidConnection } @SuppressWarnings("GuardedBy") + final PreparedStatement statement = sqlStatementFactory.preparedStatement( + sqlQueryPlus.withContext(sessionContext) + ); final DruidJdbcPreparedStatement jdbcStmt = new DruidJdbcPreparedStatement( connectionId, statementId, - sqlStatementFactory.preparedStatement(sqlQueryPlus), - maxRowCount + statement, + maxRowCount, + fetcherFactory ); statements.put(statementId, jdbcStmt); @@ -156,17 +170,15 @@ public class DruidConnection } } - public AbstractDruidJdbcStatement getStatement(final int statementId) + public synchronized AbstractDruidJdbcStatement getStatement(final int statementId) { - synchronized (connectionLock) { - return statements.get(statementId); - } + return statements.get(statementId); } public void closeStatement(int statementId) { AbstractDruidJdbcStatement stmt; - synchronized (connectionLock) { + synchronized (this) { stmt = statements.remove(statementId); } if (stmt != null) { @@ -180,34 +192,30 @@ public class DruidConnection * * @return true if closed */ - public boolean closeIfEmpty() + public synchronized boolean closeIfEmpty() { - synchronized (connectionLock) { - if (statements.isEmpty()) { - close(); - return true; - } else { - return false; - } + if (statements.isEmpty()) { + close(); + return true; + } else { + return false; } } - public void close() + public synchronized void close() { - synchronized (connectionLock) { - // Copy statements before iterating because statement.close() modifies it. - for (AbstractDruidJdbcStatement statement : ImmutableList.copyOf(statements.values())) { - try { - statement.close(); - } - catch (Exception e) { - LOG.warn("Connection [%s] failed to close statement [%s]!", connectionId, statement.getStatementId()); - } + // Copy statements before iterating because statement.close() modifies it. + for (AbstractDruidJdbcStatement statement : ImmutableList.copyOf(statements.values())) { + try { + statement.close(); + } + catch (Exception e) { + LOG.warn("Connection [%s] failed to close statement [%s]!", connectionId, statement.getStatementId()); } - - LOG.debug("Connection [%s] closed.", connectionId); - open = false; } + + LOG.debug("Connection [%s] closed.", connectionId); + open = false; } public DruidConnection sync(final Future newTimeoutFuture) diff --git a/sql/src/main/java/org/apache/druid/sql/avatica/DruidJdbcPreparedStatement.java b/sql/src/main/java/org/apache/druid/sql/avatica/DruidJdbcPreparedStatement.java index 3cd608addb7..dcd599c5428 100644 --- a/sql/src/main/java/org/apache/druid/sql/avatica/DruidJdbcPreparedStatement.java +++ b/sql/src/main/java/org/apache/druid/sql/avatica/DruidJdbcPreparedStatement.java @@ -26,6 +26,7 @@ import org.apache.druid.java.util.common.ISE; import org.apache.druid.server.security.ForbiddenException; import org.apache.druid.sql.DirectStatement; import org.apache.druid.sql.PreparedStatement; +import org.apache.druid.sql.avatica.DruidJdbcResultSet.ResultFetcherFactory; import org.apache.druid.sql.calcite.planner.PrepareResult; import java.util.List; @@ -50,10 +51,11 @@ public class DruidJdbcPreparedStatement extends AbstractDruidJdbcStatement final String connectionId, final int statementId, final PreparedStatement stmt, - final long maxRowCount + final long maxRowCount, + final ResultFetcherFactory fetcherFactory ) { - super(connectionId, statementId); + super(connectionId, statementId, fetcherFactory); this.sqlStatement = stmt; this.maxRowCount = maxRowCount; } @@ -98,7 +100,7 @@ public class DruidJdbcPreparedStatement extends AbstractDruidJdbcStatement closeResultSet(); try { DirectStatement directStmt = sqlStatement.execute(parameters); - resultSet = new DruidJdbcResultSet(this, directStmt, maxRowCount); + resultSet = new DruidJdbcResultSet(this, directStmt, maxRowCount, fetcherFactory); resultSet.execute(); } // Failure to execute does not close the prepared statement. diff --git a/sql/src/main/java/org/apache/druid/sql/avatica/DruidJdbcResultSet.java b/sql/src/main/java/org/apache/druid/sql/avatica/DruidJdbcResultSet.java index 2b494015525..1eb0d1aa5ef 100644 --- a/sql/src/main/java/org/apache/druid/sql/avatica/DruidJdbcResultSet.java +++ b/sql/src/main/java/org/apache/druid/sql/avatica/DruidJdbcResultSet.java @@ -22,20 +22,27 @@ package org.apache.druid.sql.avatica; import com.google.common.base.Preconditions; import com.google.errorprone.annotations.concurrent.GuardedBy; import org.apache.calcite.avatica.Meta; +import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.java.util.common.guava.Yielder; import org.apache.druid.java.util.common.guava.Yielders; +import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.sql.DirectStatement; import java.io.Closeable; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.CancellationException; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; /** * Druid's server-side representation of a JDBC result set. At most one @@ -59,6 +66,105 @@ import java.util.concurrent.ExecutorService; */ public class DruidJdbcResultSet implements Closeable { + private static final Logger LOG = new Logger(DruidJdbcResultSet.class); + + /** + * Asynchronous result fetcher. JDBC operates via REST, which is subject to + * a timeout if a query takes too long to respond. Fortunately, JDBC uses a + * batched API, and is perfectly happy to get an empty batch. This class + * runs in a separate thread to fetch a batch. If the fetch takes too long, + * the JDBC request thread will time out waiting, will return an empty batch + * to the client, and will remember the fetch for use in the next fetch + * request. The result is that the time it takes to produce results for long + * running queries is decoupled from the HTTP timeout. + */ + public static class ResultFetcher implements Callable + { + private final int limit; + private int batchSize; + private int offset; + private Yielder yielder; + + public ResultFetcher( + final int limit, + final Yielder yielder + ) + { + this.limit = limit; + this.yielder = yielder; + } + + /** + * In an ideal world, the batch size would be a constructor parameter. But, JDBC, + * oddly, allows a different batch size per request. Hence, we set the size using + * this method before each fetch attempt. + */ + public void setBatchSize(int batchSize) + { + this.batchSize = batchSize; + } + + /** + * Result is only valid between executions, which turns out to be + * the only time it is called. + */ + public int offset() + { + return offset; + } + + /** + * Fetch the next batch up to the batch size or EOF. Return + * the resulting frame. Exceptions are handled by the executor + * framework. + */ + @Override + public Meta.Frame call() + { + Preconditions.checkState(batchSize > 0); + int rowCount = 0; + final int batchLimit = Math.min(limit - offset, batchSize); + final List rows = new ArrayList<>(batchLimit); + while (!yielder.isDone() && rowCount < batchLimit) { + rows.add(yielder.get()); + yielder = yielder.next(null); + rowCount++; + } + + final Meta.Frame result = new Meta.Frame(offset, yielder.isDone(), rows); + offset += rowCount; + return result; + } + } + + /** + * Creates the result fetcher and holds config. Rather overkill for production, + * but handy for testing. + */ + public static class ResultFetcherFactory + { + final int fetchTimeoutMs; + + public ResultFetcherFactory(int fetchTimeoutMs) + { + // To prevent server hammering, the timeout must be at least 1 second. + this.fetchTimeoutMs = Math.max(1000, fetchTimeoutMs); + } + + public int fetchTimeoutMs() + { + return fetchTimeoutMs; + } + + public ResultFetcher newFetcher( + final int limit, + final Yielder yielder + ) + { + return new ResultFetcher(limit, yielder); + } + } + /** * Query metrics can only be used within a single thread. Because results can * be paginated into multiple JDBC frames (each frame being processed by a @@ -77,25 +183,46 @@ public class DruidJdbcResultSet implements Closeable * https://github.com/apache/druid/pull/4288 * https://github.com/apache/druid/pull/4415 */ - private final ExecutorService yielderOpenCloseExecutor; + private final ExecutorService queryExecutor; private final DirectStatement stmt; private final long maxRowCount; + private final ResultFetcherFactory fetcherFactory; private State state = State.NEW; private Meta.Signature signature; - private Yielder yielder; - private int offset; + + /** + * The fetcher which reads batches of rows. Holds onto the yielder for a + * query. Maintains the current read offset. + */ + private ResultFetcher fetcher; + + /** + * Future for a fetch that timed out waiting, and should be used again on + * the next fetch request. + */ + private Future fetchFuture; + + /** + * Cached version of the read offset in case the caller asks for the offset + * concurrently with a fetch which may update its own offset. This offset + * is that for the last batch that the client fetched: the fetcher itself + * may be moving to a new offset. + */ + private int nextFetchOffset; public DruidJdbcResultSet( final AbstractDruidJdbcStatement jdbcStatement, final DirectStatement stmt, - final long maxRowCount + final long maxRowCount, + final ResultFetcherFactory fetcherFactory ) { this.stmt = stmt; this.maxRowCount = maxRowCount; - this.yielderOpenCloseExecutor = Execs.singleThreaded( + this.fetcherFactory = fetcherFactory; + this.queryExecutor = Execs.singleThreaded( StringUtils.format( - "JDBCYielderOpenCloseExecutor-connection-%s-statement-%d", + "JDBCQueryExecutor-connection-%s-statement-%d", StringUtils.encodeForFormat(jdbcStatement.getConnectionId()), jdbcStatement.getStatementId() ) @@ -107,19 +234,22 @@ public class DruidJdbcResultSet implements Closeable ensure(State.NEW); try { state = State.RUNNING; - final Sequence baseSequence = yielderOpenCloseExecutor.submit(stmt::execute).get().getResults(); - // We can't apply limits greater than Integer.MAX_VALUE, ignore them. - final Sequence retSequence = - maxRowCount >= 0 && maxRowCount <= Integer.MAX_VALUE - ? baseSequence.limit((int) maxRowCount) - : baseSequence; + // Execute the first step: plan the query and return a sequence to use + // to get values. + final Sequence sequence = queryExecutor.submit(stmt::execute).get().getResults(); - yielder = Yielders.each(retSequence); + // Subsequent fetch steps are done via the async "fetcher". + fetcher = fetcherFactory.newFetcher( + // We can't apply limits greater than Integer.MAX_VALUE, ignore them. + maxRowCount >= 0 && maxRowCount <= Integer.MAX_VALUE ? (int) maxRowCount : Integer.MAX_VALUE, + Yielders.each(sequence) + ); signature = AbstractDruidJdbcStatement.createSignature( stmt.prepareResult(), stmt.query().sql() ); + LOG.debug("Opened result set [%s]", stmt.sqlQueryId()); } catch (ExecutionException e) { throw closeAndPropagateThrowable(e.getCause()); @@ -143,34 +273,61 @@ public class DruidJdbcResultSet implements Closeable public synchronized Meta.Frame nextFrame(final long fetchOffset, final int fetchMaxRowCount) { ensure(State.RUNNING, State.DONE); - Preconditions.checkState(fetchOffset == offset, "fetchOffset [%,d] != offset [%,d]", fetchOffset, offset); + if (fetchOffset != nextFetchOffset) { + throw new IAE( + "Druid can only fetch forward. Requested offset of %,d != current offset %,d", + fetchOffset, + nextFetchOffset + ); + } if (state == State.DONE) { - return new Meta.Frame(fetchOffset, true, Collections.emptyList()); + LOG.debug("EOF at offset %,d for result set [%s]", fetchOffset, stmt.sqlQueryId()); + return new Meta.Frame(fetcher.offset(), true, Collections.emptyList()); } + final Future future; + if (fetchFuture == null) { + // Not waiting on a batch. Request one now. + fetcher.setBatchSize(fetchMaxRowCount); + future = queryExecutor.submit(fetcher); + } else { + // Last batch took too long. Continue waiting for it. + future = fetchFuture; + fetchFuture = null; + } try { - final List rows = new ArrayList<>(); - while (!yielder.isDone() && (fetchMaxRowCount < 0 || offset < fetchOffset + fetchMaxRowCount)) { - rows.add(yielder.get()); - yielder = yielder.next(null); - offset++; - } - - if (yielder.isDone()) { + Meta.Frame result = future.get(fetcherFactory.fetchTimeoutMs(), TimeUnit.MILLISECONDS); + LOG.debug("Fetched batch at offset %,d for result set [%s]", fetchOffset, stmt.sqlQueryId()); + if (result.done) { state = State.DONE; } - - return new Meta.Frame(fetchOffset, state == State.DONE, rows); + nextFetchOffset = fetcher.offset; + return result; } - catch (Throwable t) { - throw closeAndPropagateThrowable(t); + catch (CancellationException | InterruptedException e) { + // Consider this a failure. + throw closeAndPropagateThrowable(e); + } + catch (ExecutionException e) { + // Fetch threw an error. Unwrap it. + throw closeAndPropagateThrowable(e.getCause()); + } + catch (TimeoutException e) { + LOG.debug("Timeout of batch at offset %,d for result set [%s]", fetchOffset, stmt.sqlQueryId()); + fetchFuture = future; + // Wait timed out. Return 0 rows: the client will try again later. + // We'll wait again on this same fetch next time. + // Note that when the next fetch request comes, it will use the batch + // size set here: any change in size will be ignored for the in-flight batch. + // Changing batch size mid-query is an odd case: it will probably never happen. + return new Meta.Frame(nextFetchOffset, false, Collections.emptyList()); } } public synchronized long getCurrentOffset() { ensure(State.RUNNING, State.DONE); - return offset; + return fetcher.offset; } @GuardedBy("this") @@ -215,14 +372,28 @@ public class DruidJdbcResultSet implements Closeable if (state == State.CLOSED || state == State.FAILED) { return; } + LOG.debug("Closing result set [%s]", stmt.sqlQueryId()); state = State.CLOSED; try { - if (yielder != null) { - Yielder theYielder = this.yielder; - this.yielder = null; + // If a fetch is in progress, wait for it to complete. + if (fetchFuture != null) { + try { + fetchFuture.cancel(true); + fetchFuture.get(); + } + catch (Exception e) { + // Ignore, we're shutting down anyway. + } + finally { + fetchFuture = null; + } + } + if (fetcher != null) { + Yielder theYielder = fetcher.yielder; + fetcher = null; // Put the close last, so any exceptions it throws are after we did the other cleanup above. - yielderOpenCloseExecutor.submit( + queryExecutor.submit( () -> { theYielder.close(); // makes this a Callable instead of Runnable so we don't need to catch exceptions inside the lambda @@ -230,7 +401,7 @@ public class DruidJdbcResultSet implements Closeable } ).get(); - yielderOpenCloseExecutor.shutdownNow(); + queryExecutor.shutdownNow(); } } catch (RuntimeException e) { diff --git a/sql/src/main/java/org/apache/druid/sql/avatica/DruidJdbcStatement.java b/sql/src/main/java/org/apache/druid/sql/avatica/DruidJdbcStatement.java index 3b84b7e483b..4c342a46fef 100644 --- a/sql/src/main/java/org/apache/druid/sql/avatica/DruidJdbcStatement.java +++ b/sql/src/main/java/org/apache/druid/sql/avatica/DruidJdbcStatement.java @@ -24,6 +24,9 @@ import org.apache.calcite.avatica.Meta; import org.apache.druid.sql.DirectStatement; import org.apache.druid.sql.SqlQueryPlus; import org.apache.druid.sql.SqlStatementFactory; +import org.apache.druid.sql.avatica.DruidJdbcResultSet.ResultFetcherFactory; + +import java.util.Map; /** * Represents Druid's version of the JDBC {@code Statement} class: @@ -36,22 +39,27 @@ import org.apache.druid.sql.SqlStatementFactory; public class DruidJdbcStatement extends AbstractDruidJdbcStatement { private final SqlStatementFactory lifecycleFactory; + protected final Map queryContext; public DruidJdbcStatement( final String connectionId, final int statementId, - final SqlStatementFactory lifecycleFactory + final Map queryContext, + final SqlStatementFactory lifecycleFactory, + final ResultFetcherFactory fetcherFactory ) { - super(connectionId, statementId); + super(connectionId, statementId, fetcherFactory); + this.queryContext = queryContext; this.lifecycleFactory = Preconditions.checkNotNull(lifecycleFactory, "lifecycleFactory"); } public synchronized void execute(SqlQueryPlus queryPlus, long maxRowCount) { closeResultSet(); + queryPlus = queryPlus.withContext(queryContext); DirectStatement stmt = lifecycleFactory.directStatement(queryPlus); - resultSet = new DruidJdbcResultSet(this, stmt, Long.MAX_VALUE); + resultSet = new DruidJdbcResultSet(this, stmt, Long.MAX_VALUE, fetcherFactory); try { resultSet.execute(); } diff --git a/sql/src/main/java/org/apache/druid/sql/avatica/DruidMeta.java b/sql/src/main/java/org/apache/druid/sql/avatica/DruidMeta.java index 6ee3de811cd..76efecf68ba 100644 --- a/sql/src/main/java/org/apache/druid/sql/avatica/DruidMeta.java +++ b/sql/src/main/java/org/apache/druid/sql/avatica/DruidMeta.java @@ -49,6 +49,7 @@ import org.apache.druid.server.security.AuthenticatorMapper; import org.apache.druid.server.security.ForbiddenException; import org.apache.druid.sql.SqlQueryPlus; import org.apache.druid.sql.SqlStatementFactory; +import org.apache.druid.sql.avatica.DruidJdbcResultSet.ResultFetcherFactory; import org.apache.druid.sql.calcite.planner.Calcites; import org.joda.time.Interval; @@ -95,7 +96,20 @@ public class DruidMeta extends MetaImpl */ public static T logFailure(T error) { - logFailure(error, error.getMessage()); + if (error instanceof NoSuchConnectionException) { + NoSuchConnectionException ex = (NoSuchConnectionException) error; + logFailure(error, "No such connection: %s", ex.getConnectionId()); + } else if (error instanceof NoSuchStatementException) { + NoSuchStatementException ex = (NoSuchStatementException) error; + logFailure( + error, + "No such statement: %s, %d", + ex.getStatementHandle().connectionId, + ex.getStatementHandle().id + ); + } else { + logFailure(error, error.getMessage()); + } return error; } @@ -115,6 +129,7 @@ public class DruidMeta extends MetaImpl private final AvaticaServerConfig config; private final List authenticators; private final ErrorHandler errorHandler; + private final ResultFetcherFactory fetcherFactory; /** * Tracks logical connections. @@ -145,7 +160,8 @@ public class DruidMeta extends MetaImpl .setDaemon(true) .build() ), - authMapper.getAuthenticatorChain() + authMapper.getAuthenticatorChain(), + new ResultFetcherFactory(config.getFetchTimeoutMs()) ); } @@ -154,7 +170,8 @@ public class DruidMeta extends MetaImpl final AvaticaServerConfig config, final ErrorHandler errorHandler, final ScheduledExecutorService exec, - final List authenticators + final List authenticators, + final ResultFetcherFactory fetcherFactory ) { super(null); @@ -163,6 +180,7 @@ public class DruidMeta extends MetaImpl this.errorHandler = errorHandler; this.exec = exec; this.authenticators = authenticators; + this.fetcherFactory = fetcherFactory; } @Override @@ -188,11 +206,6 @@ public class DruidMeta extends MetaImpl try { openDruidConnection(ch.id, secret, contextMap); } - catch (NoSuchConnectionException e) { - // Avoid sanitizing Avatica specific exceptions so that the Avatica code - // can rely on them to handle issues in a JDBC-specific way. - throw e; - } catch (Throwable t) { throw mapException(t); } @@ -208,9 +221,6 @@ public class DruidMeta extends MetaImpl druidConnection.close(); } } - catch (NoSuchConnectionException e) { - throw e; - } catch (Throwable t) { throw mapException(t); } @@ -224,9 +234,6 @@ public class DruidMeta extends MetaImpl getDruidConnection(ch.id); return connProps; } - catch (NoSuchConnectionException e) { - throw e; - } catch (Throwable t) { throw mapException(t); } @@ -241,12 +248,10 @@ public class DruidMeta extends MetaImpl public StatementHandle createStatement(final ConnectionHandle ch) { try { - final DruidJdbcStatement druidStatement = getDruidConnection(ch.id).createStatement(sqlStatementFactory); + final DruidJdbcStatement druidStatement = getDruidConnection(ch.id) + .createStatement(sqlStatementFactory, fetcherFactory); return new StatementHandle(ch.id, druidStatement.getStatementId(), null); } - catch (NoSuchConnectionException e) { - throw e; - } catch (Throwable t) { throw mapException(t); } @@ -275,15 +280,13 @@ public class DruidMeta extends MetaImpl final DruidJdbcPreparedStatement stmt = getDruidConnection(ch.id).createPreparedStatement( sqlStatementFactory, sqlReq, - maxRowCount + maxRowCount, + fetcherFactory ); stmt.prepare(); LOG.debug("Successfully prepared statement [%s] for execution", stmt.getStatementId()); return new StatementHandle(ch.id, stmt.getStatementId(), stmt.getSignature()); } - catch (NoSuchConnectionException e) { - throw e; - } catch (Throwable t) { throw mapException(t); } @@ -315,7 +318,7 @@ public class DruidMeta extends MetaImpl } /** - * Prepares and executes a JDBC {@code Statement} + * Prepares and executes a JDBC {@code Statement}. */ @Override public ExecuteResult prepareAndExecute( @@ -324,26 +327,25 @@ public class DruidMeta extends MetaImpl final long maxRowCount, final int maxRowsInFirstFrame, final PrepareCallback callback - ) throws NoSuchStatementException + ) { - try { // Ignore "callback", this class is designed for use with LocalService which doesn't use it. final DruidJdbcStatement druidStatement = getDruidStatement(statement, DruidJdbcStatement.class); final DruidConnection druidConnection = getDruidConnection(statement.connectionId); - AuthenticationResult authenticationResult = doAuthenticate(druidConnection); - SqlQueryPlus sqlRequest = SqlQueryPlus.builder(sql) - .auth(authenticationResult) - .context(druidConnection.sessionContext()) - .build(); - druidStatement.execute(sqlRequest, maxRowCount); - ExecuteResult result = doFetch(druidStatement, maxRowsInFirstFrame); - LOG.debug("Successfully prepared statement [%s] and started execution", druidStatement.getStatementId()); - return result; - } - // Cannot affect these exceptions as Avatica handles them. - catch (NoSuchConnectionException | NoSuchStatementException e) { - throw e; + + // This method is called directly from the Avatica server: it does not go + // through the connection first. We must lock the connection here to prevent race conditions. + synchronized (druidConnection) { + final AuthenticationResult authenticationResult = doAuthenticate(druidConnection); + final SqlQueryPlus sqlRequest = SqlQueryPlus.builder(sql) + .auth(authenticationResult) + .build(); + druidStatement.execute(sqlRequest, maxRowCount); + final ExecuteResult result = doFetch(druidStatement, maxRowsInFirstFrame); + LOG.debug("Successfully prepared statement [%s] and started execution", druidStatement.getStatementId()); + return result; + } } catch (Throwable t) { throw mapException(t); @@ -357,6 +359,15 @@ public class DruidMeta extends MetaImpl */ private RuntimeException mapException(Throwable t) { + // Don't sanitize or wrap Avatica exceptions: these exceptions + // are handled specially by Avatica to provide SQLState, Error Code + // and other JDBC-specific items. + if (t instanceof AvaticaRuntimeException) { + throw (AvaticaRuntimeException) t; + } + if (t instanceof NoSuchConnectionException) { + throw (NoSuchConnectionException) t; + } // BasicSecurityAuthenticationException is not visible here. String className = t.getClass().getSimpleName(); if (t instanceof ForbiddenException || @@ -365,7 +376,8 @@ public class DruidMeta extends MetaImpl t.getMessage(), ErrorResponse.UNAUTHORIZED_ERROR_CODE, ErrorResponse.UNAUTHORIZED_SQL_STATE, - AvaticaSeverity.ERROR); + AvaticaSeverity.ERROR + ); } // Let Avatica do its default mapping. @@ -425,9 +437,6 @@ public class DruidMeta extends MetaImpl LOG.debug("Fetching next frame from offset %,d with %,d rows for statement [%s]", offset, maxRows, statement.id); return getDruidStatement(statement, AbstractDruidJdbcStatement.class).nextFrame(offset, maxRows); } - catch (NoSuchConnectionException e) { - throw e; - } catch (Throwable t) { throw mapException(t); } @@ -450,7 +459,7 @@ public class DruidMeta extends MetaImpl final StatementHandle statement, final List parameterValues, final int maxRowsInFirstFrame - ) throws NoSuchStatementException + ) { try { final DruidJdbcPreparedStatement druidStatement = @@ -462,9 +471,6 @@ public class DruidMeta extends MetaImpl druidStatement.getStatementId()); return result; } - catch (NoSuchStatementException | NoSuchConnectionException e) { - throw e; - } catch (Throwable t) { throw mapException(t); } @@ -493,9 +499,6 @@ public class DruidMeta extends MetaImpl druidConnection.closeStatement(h.id); } } - catch (NoSuchConnectionException e) { - throw e; - } catch (Throwable t) { throw mapException(t); } @@ -506,7 +509,7 @@ public class DruidMeta extends MetaImpl final StatementHandle sh, final QueryState state, final long offset - ) throws NoSuchStatementException + ) { try { final AbstractDruidJdbcStatement druidStatement = getDruidStatement(sh, AbstractDruidJdbcStatement.class); @@ -521,9 +524,6 @@ public class DruidMeta extends MetaImpl } return !isDone; } - catch (NoSuchStatementException | NoSuchConnectionException e) { - throw e; - } catch (Throwable t) { throw mapException(t); } @@ -560,9 +560,6 @@ public class DruidMeta extends MetaImpl return sqlResultSet(ch, sql); } - catch (NoSuchConnectionException e) { - throw e; - } catch (Throwable t) { throw mapException(t); } @@ -597,9 +594,6 @@ public class DruidMeta extends MetaImpl return sqlResultSet(ch, sql); } - catch (NoSuchConnectionException e) { - throw e; - } catch (Throwable t) { throw mapException(t); } @@ -656,9 +650,6 @@ public class DruidMeta extends MetaImpl return sqlResultSet(ch, sql); } - catch (NoSuchConnectionException e) { - throw e; - } catch (Throwable t) { throw mapException(t); } @@ -726,9 +717,6 @@ public class DruidMeta extends MetaImpl return sqlResultSet(ch, sql); } - catch (NoSuchConnectionException e) { - throw e; - } catch (Throwable t) { throw mapException(t); } @@ -747,9 +735,6 @@ public class DruidMeta extends MetaImpl return sqlResultSet(ch, sql); } - catch (NoSuchConnectionException e) { - throw e; - } catch (Throwable t) { throw mapException(t); } @@ -851,7 +836,7 @@ public class DruidMeta extends MetaImpl return connection.sync( exec.schedule( () -> { - LOG.debug("Connection[%s] timed out.", connectionId); + LOG.debug("Connection [%s] timed out.", connectionId); closeConnection(new ConnectionHandle(connectionId)); }, new Interval(DateTimes.nowUtc(), config.getConnectionIdleTimeout()).toDurationMillis(), diff --git a/sql/src/test/java/org/apache/druid/sql/avatica/DruidAvaticaHandlerTest.java b/sql/src/test/java/org/apache/druid/sql/avatica/DruidAvaticaHandlerTest.java index 119d10ae4ef..9cf65fe1966 100644 --- a/sql/src/test/java/org/apache/druid/sql/avatica/DruidAvaticaHandlerTest.java +++ b/sql/src/test/java/org/apache/druid/sql/avatica/DruidAvaticaHandlerTest.java @@ -47,6 +47,7 @@ import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.concurrent.Execs; +import org.apache.druid.java.util.common.guava.Yielder; import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.math.expr.ExprMacroTable; @@ -67,6 +68,9 @@ import org.apache.druid.server.security.AuthTestUtils; import org.apache.druid.server.security.AuthenticatorMapper; import org.apache.druid.server.security.AuthorizerMapper; import org.apache.druid.server.security.Escalator; +import org.apache.druid.sql.SqlStatementFactory; +import org.apache.druid.sql.avatica.DruidJdbcResultSet.ResultFetcher; +import org.apache.druid.sql.avatica.DruidJdbcResultSet.ResultFetcherFactory; import org.apache.druid.sql.calcite.planner.CalciteRulesManager; import org.apache.druid.sql.calcite.planner.Calcites; import org.apache.druid.sql.calcite.planner.DruidOperatorTable; @@ -90,11 +94,12 @@ import org.junit.Before; import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; -import org.junit.rules.ExpectedException; import org.junit.rules.TemporaryFolder; +import org.skife.jdbi.v2.DBI; +import org.skife.jdbi.v2.Handle; +import org.skife.jdbi.v2.ResultIterator; import java.io.IOException; -import java.net.InetSocketAddress; import java.sql.Array; import java.sql.Connection; import java.sql.DatabaseMetaData; @@ -117,30 +122,27 @@ import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ThreadLocalRandom; /** * Tests the Avatica-based JDBC implementation using JSON serialization. See * {@link DruidAvaticaProtobufHandlerTest} for a subclass which runs * this same set of tests using Protobuf serialization. + * To run this in an IDE, set {@code -Duser.timezone=UTC}. */ public class DruidAvaticaHandlerTest extends CalciteTestBase { - private static final AvaticaServerConfig AVATICA_CONFIG = new AvaticaServerConfig() - { - @Override - public int getMaxConnections() - { - // This must match the number of Connection objects created in testTooManyStatements() - return 4; - } + private static final int CONNECTION_LIMIT = 4; + private static final int STATEMENT_LIMIT = 4; + + private static final AvaticaServerConfig AVATICA_CONFIG; + + static { + AVATICA_CONFIG = new AvaticaServerConfig(); + // This must match the number of Connection objects created in testTooManyStatements() + AVATICA_CONFIG.maxConnections = CONNECTION_LIMIT; + AVATICA_CONFIG.maxStatementsPerConnection = STATEMENT_LIMIT; + } - @Override - public int getMaxStatementsPerConnection() - { - return 4; - } - }; private static final String DUMMY_SQL_QUERY_ID = "dummy"; private static QueryRunnerFactoryConglomerate conglomerate; @@ -162,35 +164,99 @@ public class DruidAvaticaHandlerTest extends CalciteTestBase resourceCloser.close(); } - @Rule - public ExpectedException expectedException = ExpectedException.none(); - @Rule public TemporaryFolder temporaryFolder = new TemporaryFolder(); @Rule public QueryLogHook queryLogHook = QueryLogHook.create(); + private final PlannerConfig plannerConfig = new PlannerConfig(); + private final DruidOperatorTable operatorTable = CalciteTests.createOperatorTable(); + private final ExprMacroTable macroTable = CalciteTests.createExprMacroTable(); private SpecificSegmentsQuerySegmentWalker walker; - private Server server; + private ServerWrapper server; private Connection client; private Connection clientNoTrailingSlash; private Connection superuserClient; private Connection clientLosAngeles; - private DruidMeta druidMeta; - private String url; private Injector injector; private TestRequestLogger testRequestLogger; + private DruidSchemaCatalog makeRootSchema() + { + return CalciteTests.createMockRootSchema( + conglomerate, + walker, + plannerConfig, + CalciteTests.TEST_AUTHORIZER_MAPPER + ); + } + + private class ServerWrapper + { + final DruidMeta druidMeta; + final Server server; + final String url; + + ServerWrapper(final DruidMeta druidMeta) throws Exception + { + this.druidMeta = druidMeta; + server = new Server(0); + server.setHandler(getAvaticaHandler(druidMeta)); + server.start(); + url = StringUtils.format( + "jdbc:avatica:remote:url=%s%s", + server.getURI().toString(), + StringUtils.maybeRemoveLeadingSlash(getJdbcUrlTail()) + ); + } + + public Connection getConnection(String user, String password) throws SQLException + { + return DriverManager.getConnection(url, user, password); + } + + public Connection getUserConnection() throws SQLException + { + return getConnection("regularUser", "druid"); + } + + // Note: though the URL-only form is OK in general, but it will cause tests + // to crash as the mock auth test code needs the user name. + // Use getUserConnection() instead, or create a URL that includes the + // user name and password. + //public Connection getConnection() throws SQLException + //{ + // return DriverManager.getConnection(url); + //} + + public void close() throws Exception + { + druidMeta.closeAllConnections(); + server.stop(); + } + } + + protected String getJdbcUrlTail() + { + return DruidAvaticaJsonHandler.AVATICA_PATH; + } + + // Default implementation is for JSON to allow debugging of tests. + protected AbstractAvaticaHandler getAvaticaHandler(final DruidMeta druidMeta) + { + return new DruidAvaticaJsonHandler( + druidMeta, + new DruidNode("dummy", "dummy", false, 1, null, true, false), + new AvaticaMonitor() + ); + } + @Before public void setUp() throws Exception { walker = CalciteTests.createMockWalker(conglomerate, temporaryFolder.newFolder()); - final PlannerConfig plannerConfig = new PlannerConfig(); - final DruidOperatorTable operatorTable = CalciteTests.createOperatorTable(); - final ExprMacroTable macroTable = CalciteTests.createExprMacroTable(); - final DruidSchemaCatalog rootSchema = - CalciteTests.createMockRootSchema(conglomerate, walker, plannerConfig, CalciteTests.TEST_AUTHORIZER_MAPPER); + final DruidSchemaCatalog rootSchema = makeRootSchema(); testRequestLogger = new TestRequestLogger(); injector = new CoreInjectorBuilder(new StartupInjectorBuilder().build()) @@ -227,37 +293,34 @@ public class DruidAvaticaHandlerTest extends CalciteTestBase ) .build(); - druidMeta = injector.getInstance(DruidMeta.class); - final AbstractAvaticaHandler handler = this.getAvaticaHandler(druidMeta); - final int port = ThreadLocalRandom.current().nextInt(9999) + 10000; - server = new Server(new InetSocketAddress("127.0.0.1", port)); - server.setHandler(handler); - server.start(); - url = this.getJdbcConnectionString(port); - client = DriverManager.getConnection(url, "regularUser", "druid"); - superuserClient = DriverManager.getConnection(url, CalciteTests.TEST_SUPERUSER_NAME, "druid"); - clientNoTrailingSlash = DriverManager.getConnection(StringUtils.maybeRemoveTrailingSlash(url), CalciteTests.TEST_SUPERUSER_NAME, "druid"); + DruidMeta druidMeta = injector.getInstance(DruidMeta.class); + server = new ServerWrapper(druidMeta); + client = server.getUserConnection(); + superuserClient = server.getConnection(CalciteTests.TEST_SUPERUSER_NAME, "druid"); + clientNoTrailingSlash = DriverManager.getConnection(StringUtils.maybeRemoveTrailingSlash(server.url), CalciteTests.TEST_SUPERUSER_NAME, "druid"); final Properties propertiesLosAngeles = new Properties(); propertiesLosAngeles.setProperty("sqlTimeZone", "America/Los_Angeles"); propertiesLosAngeles.setProperty("user", "regularUserLA"); propertiesLosAngeles.setProperty(BaseQuery.SQL_QUERY_ID, DUMMY_SQL_QUERY_ID); - clientLosAngeles = DriverManager.getConnection(url, propertiesLosAngeles); + clientLosAngeles = DriverManager.getConnection(server.url, propertiesLosAngeles); } @After public void tearDown() throws Exception { - client.close(); - clientLosAngeles.close(); - clientNoTrailingSlash.close(); - server.stop(); + if (server != null) { + client.close(); + clientLosAngeles.close(); + clientNoTrailingSlash.close(); + server.close(); + client = null; + clientLosAngeles = null; + clientNoTrailingSlash = null; + server = null; + } walker.close(); walker = null; - client = null; - clientLosAngeles = null; - clientNoTrailingSlash = null; - server = null; } @Test @@ -772,19 +835,21 @@ public class DruidAvaticaHandlerTest extends CalciteTestBase @Test public void testTooManyStatements() throws SQLException { - for (int i = 0; i < 4; i++) { + for (int i = 0; i < STATEMENT_LIMIT; i++) { client.createStatement(); } - expectedException.expect(AvaticaClientRuntimeException.class); - expectedException.expectMessage("Too many open statements, limit is 4"); - client.createStatement(); + AvaticaClientRuntimeException ex = Assert.assertThrows( + AvaticaClientRuntimeException.class, + () -> client.createStatement() + ); + Assert.assertTrue(ex.getMessage().contains("Too many open statements, limit is 4")); } @Test public void testNotTooManyStatementsWhenYouCloseThem() throws SQLException { - for (int i = 0; i < 10; i++) { + for (int i = 0; i < STATEMENT_LIMIT * 2; i++) { client.createStatement().close(); } } @@ -863,7 +928,7 @@ public class DruidAvaticaHandlerTest extends CalciteTestBase ImmutableList.of(ImmutableMap.of("cnt", 6L)), getRows(resultSet) ); - druidMeta.closeAllConnections(); + server.druidMeta.closeAllConnections(); } } @@ -875,70 +940,72 @@ public class DruidAvaticaHandlerTest extends CalciteTestBase superuserClient.createStatement(); clientNoTrailingSlash.createStatement(); - expectedException.expect(AvaticaClientRuntimeException.class); - expectedException.expectMessage("Too many connections"); - - DriverManager.getConnection(url); + AvaticaClientRuntimeException ex = Assert.assertThrows( + AvaticaClientRuntimeException.class, + () -> server.getUserConnection() + ); + Assert.assertTrue(ex.getMessage().contains("Too many connections")); } @Test - public void testNotTooManyConnectionsWhenTheyAreEmpty() throws SQLException + public void testNotTooManyConnectionsWhenTheyAreClosed() throws SQLException { - for (int i = 0; i < 4; i++) { - try (Connection connection = DriverManager.getConnection(url)) { + for (int i = 0; i < CONNECTION_LIMIT * 2; i++) { + try (Connection connection = server.getUserConnection()) { } } } + @Test + public void testConnectionsCloseStatements() throws SQLException + { + for (int i = 0; i < CONNECTION_LIMIT * 2; i++) { + try (Connection connection = server.getUserConnection()) { + // Note: NOT in a try-catch block. Let the connection close the statement + final Statement statement = connection.createStatement(); + + // Again, NOT in a try-catch block: let the statement close the + // result set. + final ResultSet resultSet = statement.executeQuery("SELECT COUNT(*) AS cnt FROM druid.foo"); + Assert.assertTrue(resultSet.next()); + } + } + } + + private SqlStatementFactory makeStatementFactory() + { + return CalciteTests.createSqlStatementFactory( + CalciteTests.createMockSqlEngine(walker, conglomerate), + new PlannerFactory( + makeRootSchema(), + operatorTable, + macroTable, + plannerConfig, + AuthTestUtils.TEST_AUTHORIZER_MAPPER, + CalciteTests.getJsonMapper(), + CalciteTests.DRUID_SCHEMA_NAME, + new CalciteRulesManager(ImmutableSet.of()) + ) + ); + } + @Test public void testMaxRowsPerFrame() throws Exception { - final AvaticaServerConfig smallFrameConfig = new AvaticaServerConfig() - { - @Override - public int getMaxConnections() - { - return 2; - } + final AvaticaServerConfig config = new AvaticaServerConfig(); + config.maxConnections = 2; + config.maxStatementsPerConnection = STATEMENT_LIMIT; + config.maxRowsPerFrame = 2; - @Override - public int getMaxStatementsPerConnection() - { - return 4; - } - - @Override - public int getMaxRowsPerFrame() - { - return 2; - } - }; - - final PlannerConfig plannerConfig = new PlannerConfig(); - final DruidOperatorTable operatorTable = CalciteTests.createOperatorTable(); - final ExprMacroTable macroTable = CalciteTests.createExprMacroTable(); final List frames = new ArrayList<>(); final ScheduledExecutorService exec = Execs.scheduledSingleThreaded("testMaxRowsPerFrame"); - DruidSchemaCatalog rootSchema = - CalciteTests.createMockRootSchema(conglomerate, walker, plannerConfig, AuthTestUtils.TEST_AUTHORIZER_MAPPER); DruidMeta smallFrameDruidMeta = new DruidMeta( - CalciteTests.createSqlStatementFactory( - CalciteTests.createMockSqlEngine(walker, conglomerate), - new PlannerFactory( - rootSchema, - operatorTable, - macroTable, - plannerConfig, - AuthTestUtils.TEST_AUTHORIZER_MAPPER, - CalciteTests.getJsonMapper(), - CalciteTests.DRUID_SCHEMA_NAME, - new CalciteRulesManager(ImmutableSet.of()) - ) - ), - smallFrameConfig, + makeStatementFactory(), + config, new ErrorHandler(new ServerConfig()), exec, - injector.getInstance(AuthenticatorMapper.class).getAuthenticatorChain() + injector.getInstance(AuthenticatorMapper.class).getAuthenticatorChain(), + new ResultFetcherFactory(config.fetchTimeoutMs) ) { @Override @@ -955,13 +1022,8 @@ public class DruidAvaticaHandlerTest extends CalciteTestBase } }; - final AbstractAvaticaHandler handler = this.getAvaticaHandler(smallFrameDruidMeta); - final int port = ThreadLocalRandom.current().nextInt(9999) + 20000; - Server smallFrameServer = new Server(new InetSocketAddress("127.0.0.1", port)); - smallFrameServer.setHandler(handler); - smallFrameServer.start(); - String smallFrameUrl = this.getJdbcConnectionString(port); - Connection smallFrameClient = DriverManager.getConnection(smallFrameUrl, "regularUser", "druid"); + ServerWrapper server = new ServerWrapper(smallFrameDruidMeta); + Connection smallFrameClient = server.getUserConnection(); final ResultSet resultSet = smallFrameClient.createStatement().executeQuery( "SELECT dim1 FROM druid.foo" @@ -980,59 +1042,29 @@ public class DruidAvaticaHandlerTest extends CalciteTestBase rows ); + resultSet.close(); + smallFrameClient.close(); exec.shutdown(); + server.close(); } @Test public void testMinRowsPerFrame() throws Exception { - final int minFetchSize = 1000; - final AvaticaServerConfig smallFrameConfig = new AvaticaServerConfig() - { - @Override - public int getMaxConnections() - { - return 2; - } + final AvaticaServerConfig config = new AvaticaServerConfig(); + config.maxConnections = 2; + config.maxStatementsPerConnection = STATEMENT_LIMIT; + config.minRowsPerFrame = 1000; - @Override - public int getMaxStatementsPerConnection() - { - return 4; - } - - @Override - public int getMinRowsPerFrame() - { - return minFetchSize; - } - }; - - final PlannerConfig plannerConfig = new PlannerConfig(); - final DruidOperatorTable operatorTable = CalciteTests.createOperatorTable(); - final ExprMacroTable macroTable = CalciteTests.createExprMacroTable(); final List frames = new ArrayList<>(); final ScheduledExecutorService exec = Execs.scheduledSingleThreaded("testMaxRowsPerFrame"); - DruidSchemaCatalog rootSchema = - CalciteTests.createMockRootSchema(conglomerate, walker, plannerConfig, AuthTestUtils.TEST_AUTHORIZER_MAPPER); DruidMeta smallFrameDruidMeta = new DruidMeta( - CalciteTests.createSqlStatementFactory( - CalciteTests.createMockSqlEngine(walker, conglomerate), - new PlannerFactory( - rootSchema, - operatorTable, - macroTable, - plannerConfig, - AuthTestUtils.TEST_AUTHORIZER_MAPPER, - CalciteTests.getJsonMapper(), - CalciteTests.DRUID_SCHEMA_NAME, - new CalciteRulesManager(ImmutableSet.of()) - ) - ), - smallFrameConfig, + makeStatementFactory(), + config, new ErrorHandler(new ServerConfig()), exec, - injector.getInstance(AuthenticatorMapper.class).getAuthenticatorChain() + injector.getInstance(AuthenticatorMapper.class).getAuthenticatorChain(), + new ResultFetcherFactory(config.fetchTimeoutMs) ) { @Override @@ -1043,20 +1075,15 @@ public class DruidAvaticaHandlerTest extends CalciteTestBase ) throws NoSuchStatementException, MissingResultsException { // overriding fetch allows us to track how many frames are processed after the first frame, and also fetch size - Assert.assertEquals(minFetchSize, fetchMaxRowCount); + Assert.assertEquals(config.minRowsPerFrame, fetchMaxRowCount); Frame frame = super.fetch(statement, offset, fetchMaxRowCount); frames.add(frame); return frame; } }; - final AbstractAvaticaHandler handler = this.getAvaticaHandler(smallFrameDruidMeta); - final int port = ThreadLocalRandom.current().nextInt(9999) + 20000; - Server smallFrameServer = new Server(new InetSocketAddress("127.0.0.1", port)); - smallFrameServer.setHandler(handler); - smallFrameServer.start(); - String smallFrameUrl = this.getJdbcConnectionString(port); - Connection smallFrameClient = DriverManager.getConnection(smallFrameUrl, "regularUser", "druid"); + ServerWrapper server = new ServerWrapper(smallFrameDruidMeta); + Connection smallFrameClient = server.getUserConnection(); // use a prepared statement because Avatica currently ignores fetchSize on the initial fetch of a Statement PreparedStatement statement = smallFrameClient.prepareStatement("SELECT dim1 FROM druid.foo"); @@ -1078,7 +1105,10 @@ public class DruidAvaticaHandlerTest extends CalciteTestBase rows ); + resultSet.close(); + smallFrameClient.close(); exec.shutdown(); + server.close(); } @Test @@ -1546,24 +1576,124 @@ public class DruidAvaticaHandlerTest extends CalciteTestBase Assert.fail("Test failed, did not get SQLException"); } - // Default implementation is for JSON to allow debugging of tests. - protected String getJdbcConnectionString(final int port) + private static class TestResultFetcher extends ResultFetcher { - return StringUtils.format( - "jdbc:avatica:remote:url=http://127.0.0.1:%d%s", - port, - DruidAvaticaJsonHandler.AVATICA_PATH - ); + public TestResultFetcher(int limit, Yielder yielder) + { + super(limit, yielder); + } + + @Override + public Meta.Frame call() + { + try { + if (offset() == 0) { + System.out.println("Taking a nap now..."); + Thread.sleep(3000); + } + } + catch (InterruptedException e) { + throw new RuntimeException(e); + } + return super.call(); + } } - // Default implementation is for JSON to allow debugging of tests. - protected AbstractAvaticaHandler getAvaticaHandler(final DruidMeta druidMeta) + /** + * Test the async aspect of the Avatica implementation. The fetch of the + * first batch takes 3 seconds (due to a sleep). However, the client will + * wait only 1 second. So, we should get ~3 empty batches before we get + * the first batch with rows. + */ + @Test + public void testAsync() throws Exception { - return new DruidAvaticaJsonHandler( - druidMeta, - new DruidNode("dummy", "dummy", false, 1, null, true, false), - new AvaticaMonitor() - ); + final AvaticaServerConfig config = new AvaticaServerConfig(); + config.maxConnections = CONNECTION_LIMIT; + config.maxStatementsPerConnection = STATEMENT_LIMIT; + config.maxRowsPerFrame = 2; + config.fetchTimeoutMs = 1000; + + final List frames = new ArrayList<>(); + final ScheduledExecutorService exec = Execs.scheduledSingleThreaded("testMaxRowsPerFrame"); + DruidMeta druidMeta = new DruidMeta( + makeStatementFactory(), + config, + new ErrorHandler(new ServerConfig()), + exec, + injector.getInstance(AuthenticatorMapper.class).getAuthenticatorChain(), + new ResultFetcherFactory(config.fetchTimeoutMs) { + @Override + public ResultFetcher newFetcher( + final int limit, + final Yielder yielder + ) + { + return new TestResultFetcher(limit, yielder); + } + } + ) + { + @Override + public Frame fetch( + final StatementHandle statement, + final long offset, + final int fetchMaxRowCount + ) throws NoSuchStatementException, MissingResultsException + { + Frame frame = super.fetch(statement, offset, fetchMaxRowCount); + frames.add(frame); + return frame; + } + }; + + ServerWrapper server = new ServerWrapper(druidMeta); + try (Connection conn = server.getUserConnection()) { + + // Test with plain JDBC + try (ResultSet resultSet = conn.createStatement().executeQuery( + "SELECT dim1 FROM druid.foo")) { + List> rows = getRows(resultSet); + Assert.assertEquals(6, rows.size()); + Assert.assertTrue(frames.size() > 3); + + // There should be at least one empty frame due to timeout + Assert.assertFalse(frames.get(0).rows.iterator().hasNext()); + } + } + + testWithJDBI(server.url); + + exec.shutdown(); + server.close(); + } + + // Test the async feature using DBI, as used internally in Druid. + // Ensures that DBI knows how to handle empty batches (which should, + // in reality, but handled at the JDBC level below DBI.) + private void testWithJDBI(String baseUrl) + { + String url = baseUrl + "?user=regularUser&password=druid" + getJdbcUrlTail(); + System.out.println(url); + DBI dbi = new DBI(url); + Handle handle = dbi.open(); + try { + ResultIterator> iter = handle + .createQuery("SELECT __time, dim1 FROM druid.foo") + .map((index, row, ctx) -> new Pair<>(row.getLong(1), row.getString(2))) + .iterator(); + int count = 0; + while (iter.hasNext()) { + Pair row = iter.next(); + Assert.assertNotNull(row.lhs); + Assert.assertNotNull(row.rhs); + count++; + } + Assert.assertEquals(6, count); + } + finally { + handle.close(); + } } private static List> getRows(final ResultSet resultSet) throws SQLException diff --git a/sql/src/test/java/org/apache/druid/sql/avatica/DruidAvaticaProtobufHandlerTest.java b/sql/src/test/java/org/apache/druid/sql/avatica/DruidAvaticaProtobufHandlerTest.java index af447f6c138..bbcb0dba928 100644 --- a/sql/src/test/java/org/apache/druid/sql/avatica/DruidAvaticaProtobufHandlerTest.java +++ b/sql/src/test/java/org/apache/druid/sql/avatica/DruidAvaticaProtobufHandlerTest.java @@ -26,11 +26,10 @@ import org.apache.druid.server.DruidNode; public class DruidAvaticaProtobufHandlerTest extends DruidAvaticaHandlerTest { @Override - protected String getJdbcConnectionString(final int port) + protected String getJdbcUrlTail() { return StringUtils.format( - "jdbc:avatica:remote:url=http://127.0.0.1:%d%s;serialization=protobuf", - port, + "%s;serialization=protobuf", DruidAvaticaProtobufHandler.AVATICA_PATH ); } diff --git a/sql/src/test/java/org/apache/druid/sql/avatica/DruidStatementTest.java b/sql/src/test/java/org/apache/druid/sql/avatica/DruidStatementTest.java index e5dc4a662ac..f9b0718d1e1 100644 --- a/sql/src/test/java/org/apache/druid/sql/avatica/DruidStatementTest.java +++ b/sql/src/test/java/org/apache/druid/sql/avatica/DruidStatementTest.java @@ -35,6 +35,7 @@ import org.apache.druid.server.security.AllowAllAuthenticator; import org.apache.druid.server.security.AuthTestUtils; import org.apache.druid.sql.SqlQueryPlus; import org.apache.druid.sql.SqlStatementFactory; +import org.apache.druid.sql.avatica.DruidJdbcResultSet.ResultFetcherFactory; import org.apache.druid.sql.calcite.planner.CalciteRulesManager; import org.apache.druid.sql.calcite.planner.DruidOperatorTable; import org.apache.druid.sql.calcite.planner.PlannerConfig; @@ -138,7 +139,9 @@ public class DruidStatementTest extends CalciteTestBase return new DruidJdbcStatement( "", 0, - sqlStatementFactory + Collections.emptyMap(), + sqlStatementFactory, + new ResultFetcherFactory(AvaticaServerConfig.DEFAULT_FETCH_TIMEOUT_MS) ); } @@ -517,7 +520,8 @@ public class DruidStatementTest extends CalciteTestBase "", 0, sqlStatementFactory.preparedStatement(queryPlus), - Long.MAX_VALUE + Long.MAX_VALUE, + new ResultFetcherFactory(AvaticaServerConfig.DEFAULT_FETCH_TIMEOUT_MS) ); } diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java b/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java index 0370ad72a0d..fc0a845ca9b 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java @@ -185,19 +185,19 @@ public class CalciteTests public Authorizer getAuthorizer(String name) { return (authenticationResult, resource, action) -> { - if (authenticationResult.getIdentity().equals(TEST_SUPERUSER_NAME)) { + if (TEST_SUPERUSER_NAME.equals(authenticationResult.getIdentity())) { return Access.OK; } switch (resource.getType()) { case ResourceType.DATASOURCE: - if (resource.getName().equals(FORBIDDEN_DATASOURCE)) { + if (FORBIDDEN_DATASOURCE.equals(resource.getName())) { return new Access(false); } else { return Access.OK; } case ResourceType.VIEW: - if (resource.getName().equals("forbiddenView")) { + if ("forbiddenView".equals(resource.getName())) { return new Access(false); } else { return Access.OK;