fix issue with jdbc and query metrics (#13608)

* fix issue with metrics emitting and jdbc results by getting yielder from query processing thread

* more better
This commit is contained in:
Clint Wylie 2022-12-21 19:32:53 -08:00 committed by GitHub
parent df55768535
commit fd63e5a514
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 2 additions and 4 deletions

View File

@ -26,7 +26,6 @@ import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Yielder; import org.apache.druid.java.util.common.guava.Yielder;
import org.apache.druid.java.util.common.guava.Yielders; import org.apache.druid.java.util.common.guava.Yielders;
import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.java.util.common.logger.Logger;
@ -237,13 +236,12 @@ public class DruidJdbcResultSet implements Closeable
// Execute the first step: plan the query and return a sequence to use // Execute the first step: plan the query and return a sequence to use
// to get values. // to get values.
final Sequence<Object[]> sequence = queryExecutor.submit(stmt::execute).get().getResults(); final Yielder<Object[]> yielder = queryExecutor.submit(() -> Yielders.each(stmt.execute().getResults())).get();
// Subsequent fetch steps are done via the async "fetcher". // Subsequent fetch steps are done via the async "fetcher".
fetcher = fetcherFactory.newFetcher( fetcher = fetcherFactory.newFetcher(
// We can't apply limits greater than Integer.MAX_VALUE, ignore them. // We can't apply limits greater than Integer.MAX_VALUE, ignore them.
maxRowCount >= 0 && maxRowCount <= Integer.MAX_VALUE ? (int) maxRowCount : Integer.MAX_VALUE, maxRowCount >= 0 && maxRowCount <= Integer.MAX_VALUE ? (int) maxRowCount : Integer.MAX_VALUE,
Yielders.each(sequence) yielder
); );
signature = AbstractDruidJdbcStatement.createSignature( signature = AbstractDruidJdbcStatement.createSignature(
stmt.prepareResult(), stmt.prepareResult(),