diff --git a/core/src/main/java/org/apache/druid/java/util/common/concurrent/Execs.java b/core/src/main/java/org/apache/druid/java/util/common/concurrent/Execs.java
index a310e567560..c5bc20f45f2 100644
--- a/core/src/main/java/org/apache/druid/java/util/common/concurrent/Execs.java
+++ b/core/src/main/java/org/apache/druid/java/util/common/concurrent/Execs.java
@@ -43,7 +43,6 @@ import java.util.concurrent.TimeUnit;
*/
public class Execs
{
-
/**
* Returns an ExecutorService which is terminated and shutdown from the beginning and not able to accept any tasks.
*/
diff --git a/docs/configuration/index.md b/docs/configuration/index.md
index cb826ef348a..6e92076eb7b 100644
--- a/docs/configuration/index.md
+++ b/docs/configuration/index.md
@@ -1888,6 +1888,7 @@ The Druid SQL server is configured through the following properties on the Broke
|`druid.sql.avatica.minRowsPerFrame`|Minimum acceptable value for the JDBC client `Statement.setFetchSize` method. The value for this property must greater than 0. If the JDBC client calls `Statement.setFetchSize` with a lesser value, Druid uses `minRowsPerFrame` instead. If `maxRowsPerFrame` is less than `minRowsPerFrame`, Druid uses the minimum value of the two. For handling queries which produce results with a large number of rows, you can increase this value to reduce the number of fetches required to completely transfer the result set.|100|
|`druid.sql.avatica.maxStatementsPerConnection`|Maximum number of simultaneous open statements per Avatica client connection.|4|
|`druid.sql.avatica.connectionIdleTimeout`|Avatica client connection idle timeout.|PT5M|
+|`druid.sql.avatica.fetchTimeoutMs`|Avatica fetch timeout, in milliseconds. When a request for the next batch of data takes longer than this time, Druid returns an empty result set, causing the client to poll again. This avoids HTTP timeouts for long-running queries. The default of 5 sec. is good for most cases. |5000|
|`druid.sql.http.enable`|Whether to enable JSON over HTTP querying at `/druid/v2/sql/`.|true|
|`druid.sql.planner.maxTopNLimit`|Maximum threshold for a [TopN query](../querying/topnquery.md). Higher limits will be planned as [GroupBy queries](../querying/groupbyquery.md) instead.|100000|
|`druid.sql.planner.metadataRefreshPeriod`|Throttle for metadata refreshes.|PT1M|
diff --git a/services/src/main/java/org/apache/druid/server/AsyncQueryForwardingServlet.java b/services/src/main/java/org/apache/druid/server/AsyncQueryForwardingServlet.java
index fa3b52669b6..75b13a39f1f 100644
--- a/services/src/main/java/org/apache/druid/server/AsyncQueryForwardingServlet.java
+++ b/services/src/main/java/org/apache/druid/server/AsyncQueryForwardingServlet.java
@@ -402,7 +402,7 @@ public class AsyncQueryForwardingServlet extends AsyncProxyServlet implements Qu
HttpServletResponse response
) throws ServletException, IOException
{
- // Just call the superclass service method. Overriden in tests.
+ // Just call the superclass service method. Overridden in tests.
super.service(request, response);
}
diff --git a/sql/pom.xml b/sql/pom.xml
index bbf2eb049ed..28f7eecffc4 100644
--- a/sql/pom.xml
+++ b/sql/pom.xml
@@ -247,9 +247,13 @@
org.mockitomockito-core
- ${mockito.version}test
+
+ org.jdbi
+ jdbi
+ test
+
diff --git a/sql/src/main/java/org/apache/druid/sql/avatica/AbstractDruidJdbcStatement.java b/sql/src/main/java/org/apache/druid/sql/avatica/AbstractDruidJdbcStatement.java
index 697ad1ca172..1992c1620b1 100644
--- a/sql/src/main/java/org/apache/druid/sql/avatica/AbstractDruidJdbcStatement.java
+++ b/sql/src/main/java/org/apache/druid/sql/avatica/AbstractDruidJdbcStatement.java
@@ -27,6 +27,7 @@ import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.sql.avatica.DruidJdbcResultSet.ResultFetcherFactory;
import org.apache.druid.sql.calcite.planner.Calcites;
import org.apache.druid.sql.calcite.planner.PrepareResult;
@@ -56,16 +57,19 @@ public abstract class AbstractDruidJdbcStatement implements Closeable
protected final String connectionId;
protected final int statementId;
+ protected final ResultFetcherFactory fetcherFactory;
protected Throwable throwable;
protected DruidJdbcResultSet resultSet;
public AbstractDruidJdbcStatement(
final String connectionId,
- final int statementId
+ final int statementId,
+ final ResultFetcherFactory fetcherFactory
)
{
this.connectionId = Preconditions.checkNotNull(connectionId, "connectionId");
this.statementId = statementId;
+ this.fetcherFactory = fetcherFactory;
}
protected static Meta.Signature createSignature(
diff --git a/sql/src/main/java/org/apache/druid/sql/avatica/AvaticaServerConfig.java b/sql/src/main/java/org/apache/druid/sql/avatica/AvaticaServerConfig.java
index e931c3a289e..a5215c19c08 100644
--- a/sql/src/main/java/org/apache/druid/sql/avatica/AvaticaServerConfig.java
+++ b/sql/src/main/java/org/apache/druid/sql/avatica/AvaticaServerConfig.java
@@ -30,6 +30,7 @@ class AvaticaServerConfig
public static Period DEFAULT_CONNECTION_IDLE_TIMEOUT = new Period("PT5M");
public static int DEFAULT_MIN_ROWS_PER_FRAME = 100;
public static int DEFAULT_MAX_ROWS_PER_FRAME = 5000;
+ public static int DEFAULT_FETCH_TIMEOUT_MS = 5000;
@JsonProperty
public int maxConnections = DEFAULT_MAX_CONNECTIONS;
@@ -46,6 +47,17 @@ class AvaticaServerConfig
@JsonProperty
public int maxRowsPerFrame = DEFAULT_MAX_ROWS_PER_FRAME;
+ /**
+ * The maximum amount of time to wait per-fetch for the next result set.
+ * If a query takes longer than this amount of time, then the fetch will
+ * return 0 rows, without EOF, and the client will automatically try
+ * another fetch. The result is an async protocol that avoids network
+ * timeouts for long-running queries, especially those that take a long
+ * time to deliver a first batch of results.
+ */
+ @JsonProperty
+ public int fetchTimeoutMs = DEFAULT_FETCH_TIMEOUT_MS;
+
public int getMaxConnections()
{
return maxConnections;
@@ -77,4 +89,9 @@ class AvaticaServerConfig
}
return minRowsPerFrame;
}
+
+ public int getFetchTimeoutMs()
+ {
+ return fetchTimeoutMs;
+ }
}
diff --git a/sql/src/main/java/org/apache/druid/sql/avatica/DruidAvaticaJsonHandler.java b/sql/src/main/java/org/apache/druid/sql/avatica/DruidAvaticaJsonHandler.java
index e2d223d1266..4f1a5818bf4 100644
--- a/sql/src/main/java/org/apache/druid/sql/avatica/DruidAvaticaJsonHandler.java
+++ b/sql/src/main/java/org/apache/druid/sql/avatica/DruidAvaticaJsonHandler.java
@@ -35,8 +35,8 @@ import java.io.IOException;
public class DruidAvaticaJsonHandler extends AvaticaJsonHandler
{
- public static final String AVATICA_PATH = "/druid/v2/sql/avatica/";
public static final String AVATICA_PATH_NO_TRAILING_SLASH = "/druid/v2/sql/avatica";
+ public static final String AVATICA_PATH = AVATICA_PATH_NO_TRAILING_SLASH + "/";
@Inject
public DruidAvaticaJsonHandler(
diff --git a/sql/src/main/java/org/apache/druid/sql/avatica/DruidAvaticaProtobufHandler.java b/sql/src/main/java/org/apache/druid/sql/avatica/DruidAvaticaProtobufHandler.java
index 50c54ad27ac..a15efadda6f 100644
--- a/sql/src/main/java/org/apache/druid/sql/avatica/DruidAvaticaProtobufHandler.java
+++ b/sql/src/main/java/org/apache/druid/sql/avatica/DruidAvaticaProtobufHandler.java
@@ -35,8 +35,8 @@ import java.io.IOException;
public class DruidAvaticaProtobufHandler extends AvaticaProtobufHandler
{
- public static final String AVATICA_PATH = "/druid/v2/sql/avatica-protobuf/";
public static final String AVATICA_PATH_NO_TRAILING_SLASH = "/druid/v2/sql/avatica-protobuf";
+ public static final String AVATICA_PATH = AVATICA_PATH_NO_TRAILING_SLASH + "/";
@Inject
public DruidAvaticaProtobufHandler(
diff --git a/sql/src/main/java/org/apache/druid/sql/avatica/DruidConnection.java b/sql/src/main/java/org/apache/druid/sql/avatica/DruidConnection.java
index 23f1a222dd0..5aa3f422b7f 100644
--- a/sql/src/main/java/org/apache/druid/sql/avatica/DruidConnection.java
+++ b/sql/src/main/java/org/apache/druid/sql/avatica/DruidConnection.java
@@ -24,8 +24,10 @@ import com.google.common.collect.ImmutableList;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.sql.PreparedStatement;
import org.apache.druid.sql.SqlQueryPlus;
import org.apache.druid.sql.SqlStatementFactory;
+import org.apache.druid.sql.avatica.DruidJdbcResultSet.ResultFetcherFactory;
import java.util.Collections;
import java.util.Map;
@@ -37,6 +39,11 @@ import java.util.concurrent.atomic.AtomicReference;
/**
* Connection tracking for {@link DruidMeta}. Thread-safe.
+ *
+ * Lock is the instance itself. Used here to protect two members, and in
+ * other code when we must resolve the connection after resolving the statement.
+ * The lock prevents closing the connection concurrently with an operation on
+ * a statement for that connection.
*/
public class DruidConnection
{
@@ -56,13 +63,12 @@ public class DruidConnection
private final AtomicInteger statementCounter = new AtomicInteger();
private final AtomicReference> timeoutFuture = new AtomicReference<>();
- // Typically synchronized by connectionLock, except in one case: the onClose function passed
+ // Typically synchronized by this instance, except in one case: the onClose function passed
// into DruidStatements contained by the map.
- @GuardedBy("connectionLock")
+ @GuardedBy("this")
private final ConcurrentMap statements = new ConcurrentHashMap<>();
- private final Object connectionLock = new Object();
- @GuardedBy("connectionLock")
+ @GuardedBy("this")
private boolean open = true;
public DruidConnection(
@@ -93,13 +99,14 @@ public class DruidConnection
return userSecret;
}
- public DruidJdbcStatement createStatement(
- final SqlStatementFactory sqlStatementFactory
+ public synchronized DruidJdbcStatement createStatement(
+ final SqlStatementFactory sqlStatementFactory,
+ final ResultFetcherFactory fetcherFactory
)
{
final int statementId = statementCounter.incrementAndGet();
- synchronized (connectionLock) {
+ synchronized (this) {
if (statements.containsKey(statementId)) {
// Will only happen if statementCounter rolls over before old statements are cleaned up. If this
// ever happens then something fishy is going on, because we shouldn't have billions of statements.
@@ -114,7 +121,9 @@ public class DruidConnection
final DruidJdbcStatement statement = new DruidJdbcStatement(
connectionId,
statementId,
- sqlStatementFactory
+ sessionContext,
+ sqlStatementFactory,
+ fetcherFactory
);
statements.put(statementId, statement);
@@ -123,15 +132,16 @@ public class DruidConnection
}
}
- public DruidJdbcPreparedStatement createPreparedStatement(
+ public synchronized DruidJdbcPreparedStatement createPreparedStatement(
final SqlStatementFactory sqlStatementFactory,
final SqlQueryPlus sqlQueryPlus,
- final long maxRowCount
+ final long maxRowCount,
+ final ResultFetcherFactory fetcherFactory
)
{
final int statementId = statementCounter.incrementAndGet();
- synchronized (connectionLock) {
+ synchronized (this) {
if (statements.containsKey(statementId)) {
// Will only happen if statementCounter rolls over before old statements are cleaned up. If this
// ever happens then something fishy is going on, because we shouldn't have billions of statements.
@@ -143,11 +153,15 @@ public class DruidConnection
}
@SuppressWarnings("GuardedBy")
+ final PreparedStatement statement = sqlStatementFactory.preparedStatement(
+ sqlQueryPlus.withContext(sessionContext)
+ );
final DruidJdbcPreparedStatement jdbcStmt = new DruidJdbcPreparedStatement(
connectionId,
statementId,
- sqlStatementFactory.preparedStatement(sqlQueryPlus),
- maxRowCount
+ statement,
+ maxRowCount,
+ fetcherFactory
);
statements.put(statementId, jdbcStmt);
@@ -156,17 +170,15 @@ public class DruidConnection
}
}
- public AbstractDruidJdbcStatement getStatement(final int statementId)
+ public synchronized AbstractDruidJdbcStatement getStatement(final int statementId)
{
- synchronized (connectionLock) {
- return statements.get(statementId);
- }
+ return statements.get(statementId);
}
public void closeStatement(int statementId)
{
AbstractDruidJdbcStatement stmt;
- synchronized (connectionLock) {
+ synchronized (this) {
stmt = statements.remove(statementId);
}
if (stmt != null) {
@@ -180,34 +192,30 @@ public class DruidConnection
*
* @return true if closed
*/
- public boolean closeIfEmpty()
+ public synchronized boolean closeIfEmpty()
{
- synchronized (connectionLock) {
- if (statements.isEmpty()) {
- close();
- return true;
- } else {
- return false;
- }
+ if (statements.isEmpty()) {
+ close();
+ return true;
+ } else {
+ return false;
}
}
- public void close()
+ public synchronized void close()
{
- synchronized (connectionLock) {
- // Copy statements before iterating because statement.close() modifies it.
- for (AbstractDruidJdbcStatement statement : ImmutableList.copyOf(statements.values())) {
- try {
- statement.close();
- }
- catch (Exception e) {
- LOG.warn("Connection [%s] failed to close statement [%s]!", connectionId, statement.getStatementId());
- }
+ // Copy statements before iterating because statement.close() modifies it.
+ for (AbstractDruidJdbcStatement statement : ImmutableList.copyOf(statements.values())) {
+ try {
+ statement.close();
+ }
+ catch (Exception e) {
+ LOG.warn("Connection [%s] failed to close statement [%s]!", connectionId, statement.getStatementId());
}
-
- LOG.debug("Connection [%s] closed.", connectionId);
- open = false;
}
+
+ LOG.debug("Connection [%s] closed.", connectionId);
+ open = false;
}
public DruidConnection sync(final Future> newTimeoutFuture)
diff --git a/sql/src/main/java/org/apache/druid/sql/avatica/DruidJdbcPreparedStatement.java b/sql/src/main/java/org/apache/druid/sql/avatica/DruidJdbcPreparedStatement.java
index 3cd608addb7..dcd599c5428 100644
--- a/sql/src/main/java/org/apache/druid/sql/avatica/DruidJdbcPreparedStatement.java
+++ b/sql/src/main/java/org/apache/druid/sql/avatica/DruidJdbcPreparedStatement.java
@@ -26,6 +26,7 @@ import org.apache.druid.java.util.common.ISE;
import org.apache.druid.server.security.ForbiddenException;
import org.apache.druid.sql.DirectStatement;
import org.apache.druid.sql.PreparedStatement;
+import org.apache.druid.sql.avatica.DruidJdbcResultSet.ResultFetcherFactory;
import org.apache.druid.sql.calcite.planner.PrepareResult;
import java.util.List;
@@ -50,10 +51,11 @@ public class DruidJdbcPreparedStatement extends AbstractDruidJdbcStatement
final String connectionId,
final int statementId,
final PreparedStatement stmt,
- final long maxRowCount
+ final long maxRowCount,
+ final ResultFetcherFactory fetcherFactory
)
{
- super(connectionId, statementId);
+ super(connectionId, statementId, fetcherFactory);
this.sqlStatement = stmt;
this.maxRowCount = maxRowCount;
}
@@ -98,7 +100,7 @@ public class DruidJdbcPreparedStatement extends AbstractDruidJdbcStatement
closeResultSet();
try {
DirectStatement directStmt = sqlStatement.execute(parameters);
- resultSet = new DruidJdbcResultSet(this, directStmt, maxRowCount);
+ resultSet = new DruidJdbcResultSet(this, directStmt, maxRowCount, fetcherFactory);
resultSet.execute();
}
// Failure to execute does not close the prepared statement.
diff --git a/sql/src/main/java/org/apache/druid/sql/avatica/DruidJdbcResultSet.java b/sql/src/main/java/org/apache/druid/sql/avatica/DruidJdbcResultSet.java
index 2b494015525..1eb0d1aa5ef 100644
--- a/sql/src/main/java/org/apache/druid/sql/avatica/DruidJdbcResultSet.java
+++ b/sql/src/main/java/org/apache/druid/sql/avatica/DruidJdbcResultSet.java
@@ -22,20 +22,27 @@ package org.apache.druid.sql.avatica;
import com.google.common.base.Preconditions;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import org.apache.calcite.avatica.Meta;
+import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Yielder;
import org.apache.druid.java.util.common.guava.Yielders;
+import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.sql.DirectStatement;
import java.io.Closeable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
/**
* Druid's server-side representation of a JDBC result set. At most one
@@ -59,6 +66,105 @@ import java.util.concurrent.ExecutorService;
*/
public class DruidJdbcResultSet implements Closeable
{
+ private static final Logger LOG = new Logger(DruidJdbcResultSet.class);
+
+ /**
+ * Asynchronous result fetcher. JDBC operates via REST, which is subject to
+ * a timeout if a query takes too long to respond. Fortunately, JDBC uses a
+ * batched API, and is perfectly happy to get an empty batch. This class
+ * runs in a separate thread to fetch a batch. If the fetch takes too long,
+ * the JDBC request thread will time out waiting, will return an empty batch
+ * to the client, and will remember the fetch for use in the next fetch
+ * request. The result is that the time it takes to produce results for long
+ * running queries is decoupled from the HTTP timeout.
+ */
+ public static class ResultFetcher implements Callable
+ {
+ private final int limit;
+ private int batchSize;
+ private int offset;
+ private Yielder