diff --git a/sql/src/main/java/io/druid/sql/avatica/DruidStatement.java b/sql/src/main/java/io/druid/sql/avatica/DruidStatement.java index c8ae0db62fe..a1ef27e731f 100644 --- a/sql/src/main/java/io/druid/sql/avatica/DruidStatement.java +++ b/sql/src/main/java/io/druid/sql/avatica/DruidStatement.java @@ -22,6 +22,7 @@ package io.druid.sql.avatica; import com.google.common.base.Preconditions; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableMap; +import io.druid.concurrent.Execs; import io.druid.java.util.common.ISE; import io.druid.java.util.common.guava.Sequence; import io.druid.java.util.common.guava.Sequences; @@ -42,6 +43,7 @@ import java.sql.DatabaseMetaData; import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.concurrent.ExecutorService; /** * Statement handle for {@link DruidMeta}. Thread-safe. @@ -64,6 +66,23 @@ public class DruidStatement implements Closeable private final Runnable onClose; private final Object lock = new Object(); + /** + * Query metrics can only be used within a single thread. Because results can be paginated into multiple + * JDBC frames (each frame being processed by a potentially different thread), the thread that closes the yielder + * (resulting in a QueryMetrics emit() call) may not be the same thread that created the yielder (which initializes + * DefaultQueryMetrics with the current thread as the owner). Create and close the yielder with this + * single-thread executor to prevent this from happening. + * + * The thread owner check in DefaultQueryMetrics is more aggressive than needed for this specific JDBC case, since + * the JDBC frames are processed sequentially. If the thread owner check is changed/loosened to permit this use case, + * we would not need to use this executor. + * + * See discussion at: + * https://github.com/druid-io/druid/pull/4288 + * https://github.com/druid-io/druid/pull/4415 + */ + private final ExecutorService yielderOpenCloseExecutor; + private State state = State.NEW; private String query; private long maxRowCount; @@ -83,6 +102,9 @@ public class DruidStatement implements Closeable this.statementId = statementId; this.queryContext = queryContext == null ? ImmutableMap.of() : queryContext; this.onClose = Preconditions.checkNotNull(onClose, "onClose"); + this.yielderOpenCloseExecutor = Execs.singleThreaded( + String.format("JDBCYielderOpenCloseExecutor-connection-%s-statement-%d", connectionId, statementId) + ); } public static List createColumnMetaData(final RelDataType rowType) @@ -166,7 +188,9 @@ public class DruidStatement implements Closeable ensure(State.PREPARED); try { - final Sequence baseSequence = plannerResult.run(); + final Sequence baseSequence = yielderOpenCloseExecutor.submit( + () -> plannerResult.run() + ).get(); // We can't apply limits greater than Integer.MAX_VALUE, ignore them. final Sequence retSequence = @@ -184,7 +208,7 @@ public class DruidStatement implements Closeable catch (Throwable t1) { t.addSuppressed(t1); } - throw t; + throw Throwables.propagate(t); } return this; @@ -286,7 +310,15 @@ public class DruidStatement implements Closeable this.yielder = null; // Put the close last, so any exceptions it throws are after we did the other cleanup above. - theYielder.close(); + yielderOpenCloseExecutor.submit( + () -> { + theYielder.close(); + // makes this a Callable instead of Runnable so we don't need to catch exceptions inside the lambda + return null; + } + ).get(); + + yielderOpenCloseExecutor.shutdownNow(); } } catch (Throwable t) {