Create/close yielder in same thread for JDBC queries (#4415)

* Create/close yielder in same thread for JDBC queries

* PR comments

* More PR comments

* Add connectionId to DruidStatement executor
This commit is contained in:
Jonathan Wei 2017-06-16 16:50:33 -07:00 committed by Gian Merlino
parent f68a0693f3
commit cc815eec81
1 changed files with 35 additions and 3 deletions

View File

@ -22,6 +22,7 @@ package io.druid.sql.avatica;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import io.druid.concurrent.Execs;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.guava.Sequence;
import io.druid.java.util.common.guava.Sequences;
@ -42,6 +43,7 @@ import java.sql.DatabaseMetaData;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
/**
* Statement handle for {@link DruidMeta}. Thread-safe.
@ -64,6 +66,23 @@ public class DruidStatement implements Closeable
private final Runnable onClose;
private final Object lock = new Object();
/**
* Query metrics can only be used within a single thread. Because results can be paginated into multiple
* JDBC frames (each frame being processed by a potentially different thread), the thread that closes the yielder
* (resulting in a QueryMetrics emit() call) may not be the same thread that created the yielder (which initializes
* DefaultQueryMetrics with the current thread as the owner). Create and close the yielder with this
* single-thread executor to prevent this from happening.
*
* The thread owner check in DefaultQueryMetrics is more aggressive than needed for this specific JDBC case, since
* the JDBC frames are processed sequentially. If the thread owner check is changed/loosened to permit this use case,
* we would not need to use this executor.
*
* See discussion at:
* https://github.com/druid-io/druid/pull/4288
* https://github.com/druid-io/druid/pull/4415
*/
private final ExecutorService yielderOpenCloseExecutor;
private State state = State.NEW;
private String query;
private long maxRowCount;
@ -83,6 +102,9 @@ public class DruidStatement implements Closeable
this.statementId = statementId;
this.queryContext = queryContext == null ? ImmutableMap.of() : queryContext;
this.onClose = Preconditions.checkNotNull(onClose, "onClose");
this.yielderOpenCloseExecutor = Execs.singleThreaded(
String.format("JDBCYielderOpenCloseExecutor-connection-%s-statement-%d", connectionId, statementId)
);
}
public static List<ColumnMetaData> createColumnMetaData(final RelDataType rowType)
@ -166,7 +188,9 @@ public class DruidStatement implements Closeable
ensure(State.PREPARED);
try {
final Sequence<Object[]> baseSequence = plannerResult.run();
final Sequence<Object[]> baseSequence = yielderOpenCloseExecutor.submit(
() -> plannerResult.run()
).get();
// We can't apply limits greater than Integer.MAX_VALUE, ignore them.
final Sequence<Object[]> retSequence =
@ -184,7 +208,7 @@ public class DruidStatement implements Closeable
catch (Throwable t1) {
t.addSuppressed(t1);
}
throw t;
throw Throwables.propagate(t);
}
return this;
@ -286,7 +310,15 @@ public class DruidStatement implements Closeable
this.yielder = null;
// Put the close last, so any exceptions it throws are after we did the other cleanup above.
theYielder.close();
yielderOpenCloseExecutor.submit(
() -> {
theYielder.close();
// makes this a Callable instead of Runnable so we don't need to catch exceptions inside the lambda
return null;
}
).get();
yielderOpenCloseExecutor.shutdownNow();
}
}
catch (Throwable t) {