Light refactor of the heavily refactored statement classes (#12909)

Reflects lessons learned from working with consumers of
the new code.
This commit is contained in:
Paul Rogers 2022-08-18 14:01:06 -07:00 committed by GitHub
parent 7fb1153bba
commit eb902375a2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 305 additions and 105 deletions

View File

@ -30,7 +30,6 @@ import org.apache.druid.server.security.ForbiddenException;
import org.apache.druid.server.security.ResourceAction;
import org.apache.druid.sql.calcite.planner.DruidPlanner;
import org.apache.druid.sql.calcite.planner.PlannerContext;
import org.apache.druid.sql.calcite.planner.PlannerResult;
import java.io.Closeable;
import java.util.Set;
@ -160,20 +159,6 @@ public abstract class AbstractStatement implements Closeable
);
}
/**
* Plan the query, which also produces the sequence that runs
* the query.
*/
protected PlannerResult plan(DruidPlanner planner)
{
try {
return planner.plan();
}
catch (ValidationException e) {
throw new SqlPlanningException(e);
}
}
/**
* Return the datasource and table resources for this
* statement.
@ -188,7 +173,7 @@ public abstract class AbstractStatement implements Closeable
return fullResourceActions;
}
public SqlQueryPlus sqlRequest()
public SqlQueryPlus query()
{
return queryPlus;
}
@ -220,4 +205,17 @@ public abstract class AbstractStatement implements Closeable
public void closeQuietly()
{
}
/**
* Convenience method to close the statement and report an error
* associated with the statement. Same as calling:{@code
* stmt.reporter().failed(e);
* stmt.close();
* }
*/
public void closeWithError(Throwable e)
{
reporter.failed(e);
close();
}
}

View File

@ -19,15 +19,20 @@
package org.apache.druid.sql;
import com.google.common.annotations.VisibleForTesting;
import org.apache.calcite.tools.ValidationException;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.query.QueryInterruptedException;
import org.apache.druid.server.security.ResourceAction;
import org.apache.druid.sql.SqlLifecycleManager.Cancelable;
import org.apache.druid.sql.calcite.planner.DruidPlanner;
import org.apache.druid.sql.calcite.planner.PlannerResult;
import org.apache.druid.sql.calcite.planner.PrepareResult;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
/**
@ -62,9 +67,91 @@ public class DirectStatement extends AbstractStatement implements Cancelable
{
private static final Logger log = new Logger(DirectStatement.class);
/**
* Represents the execution plan for a query with the ability to run
* that plan (once).
*/
public class ResultSet implements Cancelable
{
private final PlannerResult plannerResult;
public ResultSet(PlannerResult plannerResult)
{
this.plannerResult = plannerResult;
}
public SqlQueryPlus query()
{
return queryPlus;
}
/**
* Convenience method for the split plan/run case to ensure that the statement
* can, in fact, be run.
*/
public boolean runnable()
{
return plannerResult != null && plannerResult.runnable();
}
/**
* Do the actual execute step which allows subclasses to wrap the sequence,
* as is sometimes needed for testing.
*/
public Sequence<Object[]> run()
{
try {
// Check cancellation. Required for SqlResourceTest to work.
transition(State.RAN);
return plannerResult.run();
}
catch (RuntimeException e) {
reporter.failed(e);
throw e;
}
}
public SqlRowTransformer createRowTransformer()
{
return new SqlRowTransformer(plannerContext.getTimeZone(), plannerResult.rowType());
}
public SqlExecutionReporter reporter()
{
return reporter;
}
@Override
public Set<ResourceAction> resources()
{
return DirectStatement.this.resources();
}
@Override
public void cancel()
{
DirectStatement.this.cancel();
}
public void close()
{
DirectStatement.this.close();
}
}
private enum State
{
START,
PREPARED,
RAN,
CANCELLED,
FAILED,
CLOSED
}
protected PrepareResult prepareResult;
protected PlannerResult plannerResult;
private volatile boolean canceled;
protected ResultSet resultSet;
private volatile State state = State.START;
public DirectStatement(
final SqlToolbox lifecycleToolbox,
@ -84,7 +171,21 @@ public class DirectStatement extends AbstractStatement implements Cancelable
}
/**
* Direct execution of a query, including:
* Convenience method to perform Direct execution of a query. Does both
* the {@link #plan()} step and the {@link ResultSet#run()} step.
*
* @return sequence which delivers query results
*/
public Sequence<Object[]> execute()
{
return plan().run();
}
/**
* Prepares and plans a query for execution, returning a result set to
* execute the query. In Druid, prepare and plan are different: prepare provides
* information about the query, but plan does the "real" preparation to create
* an actual executable plan.
* <ul>
* <li>Create the planner.</li>
* <li>Parse the statement.</li>
@ -93,16 +194,14 @@ public class DirectStatement extends AbstractStatement implements Cancelable
* <li>Validate the query against the Druid catalog.</li>
* <li>Authorize access to the resources which the query needs.</li>
* <li>Plan the query.</li>
* <li>Return a {@link Sequence} which executes the query and returns results.</li>
* </ul>
*
* This method is called from the request thread; results are read in the
* response thread.
*
* @return sequence which delivers query results
* Call {@link ResultSet#run()} to run the resulting plan.
*/
public Sequence<Object[]> execute()
public ResultSet plan()
{
if (state != State.START) {
throw new ISE("Can plan a query only once.");
}
try (DruidPlanner planner = sqlToolbox.plannerFactory.createPlanner(
sqlToolbox.engine,
queryPlus.sql(),
@ -114,49 +213,59 @@ public class DirectStatement extends AbstractStatement implements Cancelable
// Tests cancel during this call; real clients might do so if the plan
// or execution prep stages take too long for some unexpected reason.
sqlToolbox.sqlLifecycleManager.add(sqlQueryId(), this);
checkCanceled();
plannerResult = plan(planner);
transition(State.PREPARED);
resultSet = createResultSet(createPlan(planner));
prepareResult = planner.prepareResult();
return doExecute();
// Double check needed by SqlResourceTest
transition(State.PREPARED);
return resultSet;
}
catch (RuntimeException e) {
state = State.FAILED;
reporter.failed(e);
throw e;
}
}
/**
* Plan the query, which also produces the sequence that runs
* the query.
*/
@VisibleForTesting
protected PlannerResult createPlan(DruidPlanner planner)
{
try {
return planner.plan();
}
catch (ValidationException e) {
throw new SqlPlanningException(e);
}
}
/**
* Wrapper around result set creation for the sole purpose of tests which
* inject failures.
*/
@VisibleForTesting
protected ResultSet createResultSet(PlannerResult plannerResult)
{
return new ResultSet(plannerResult);
}
public PrepareResult prepareResult()
{
return prepareResult;
}
/**
* Do the actual execute step which allows subclasses to wrap the sequence,
* as is sometimes needed for testing.
*/
protected Sequence<Object[]> doExecute()
{
// Check cancellation here and not in execute() above:
// required for SqlResourceTest to work.
checkCanceled();
try {
return plannerResult.run();
}
catch (RuntimeException e) {
reporter.failed(e);
throw e;
}
}
/**
* Checks for cancellation. As it turns out, this is really just a test-time
* check: an actual client can't cancel the query until the query reports
* a query ID, which won't happen until after the {@link #execute())}
* call.
*/
private void checkCanceled()
private void transition(State newState)
{
if (canceled) {
if (state == State.CANCELLED) {
throw new QueryInterruptedException(
QueryInterruptedException.QUERY_CANCELED,
StringUtils.format("Query is canceled [%s]", sqlQueryId()),
@ -164,12 +273,16 @@ public class DirectStatement extends AbstractStatement implements Cancelable
null
);
}
state = newState;
}
@Override
public void cancel()
{
canceled = true;
if (state == State.CLOSED) {
return;
}
state = State.CANCELLED;
final CopyOnWriteArrayList<String> nativeQueryIds = plannerContext.getNativeQueryIds();
for (String nativeQueryId : nativeQueryIds) {
@ -177,4 +290,22 @@ public class DirectStatement extends AbstractStatement implements Cancelable
sqlToolbox.queryScheduler.cancelQuery(nativeQueryId);
}
}
@Override
public void close()
{
if (state != State.START && state != State.CLOSED) {
super.close();
state = State.CLOSED;
}
}
@Override
public void closeWithError(Throwable e)
{
if (state != State.START && state != State.CLOSED) {
super.closeWithError(e);
state = State.CLOSED;
}
}
}

View File

@ -68,9 +68,4 @@ public class HttpStatement extends DirectStatement
sqlToolbox.plannerFactory.getAuthorizerMapper()
);
}
public SqlRowTransformer createRowTransformer()
{
return new SqlRowTransformer(plannerContext.getTimeZone(), plannerResult.rowType());
}
}

View File

@ -22,6 +22,7 @@ package org.apache.druid.sql;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.calcite.plan.RelOptPlanner;
import org.apache.calcite.runtime.CalciteContextException;
import org.apache.calcite.sql.parser.SqlParseException;
import org.apache.calcite.tools.ValidationException;
import org.apache.druid.query.BadQueryException;
@ -67,6 +68,11 @@ public class SqlPlanningException extends BadQueryException
this(PlanningError.VALIDATION_ERROR, e.getMessage());
}
public SqlPlanningException(CalciteContextException e)
{
this(PlanningError.VALIDATION_ERROR, e.getMessage());
}
public SqlPlanningException(PlanningError planningError, String errorMessage)
{
this(planningError.errorCode, errorMessage, planningError.errorClass);

View File

@ -65,7 +65,7 @@ public class DruidJdbcPreparedStatement extends AbstractDruidJdbcStatement
PrepareResult prepareResult = sqlStatement.prepare();
signature = createSignature(
prepareResult,
sqlStatement.sqlRequest().sql()
sqlStatement.query().sql()
);
state = State.PREPARED;
}

View File

@ -114,7 +114,7 @@ public class DruidJdbcResultSet implements Closeable
yielder = Yielders.each(retSequence);
signature = AbstractDruidJdbcStatement.createSignature(
stmt.prepareResult(),
stmt.sqlRequest().sql()
stmt.query().sql()
);
}
catch (ExecutionException e) {

View File

@ -21,6 +21,7 @@ package org.apache.druid.sql.calcite.planner;
import com.google.common.base.Supplier;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.guava.Sequence;
import java.util.concurrent.atomic.AtomicBoolean;
@ -44,6 +45,11 @@ public class PlannerResult
this.rowType = rowType;
}
public boolean runnable()
{
return !didRun.get();
}
/**
* Run the query
*/
@ -51,7 +57,7 @@ public class PlannerResult
{
if (!didRun.compareAndSet(false, true)) {
// Safety check.
throw new IllegalStateException("Cannot run more than once");
throw new ISE("Cannot run more than once");
}
return resultsSupplier.get();
}

View File

@ -44,6 +44,7 @@ import org.apache.druid.server.security.AuthorizationUtils;
import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.server.security.ForbiddenException;
import org.apache.druid.server.security.ResourceAction;
import org.apache.druid.sql.DirectStatement.ResultSet;
import org.apache.druid.sql.HttpStatement;
import org.apache.druid.sql.SqlExecutionReporter;
import org.apache.druid.sql.SqlLifecycleManager;
@ -67,6 +68,7 @@ import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
import javax.ws.rs.core.StreamingOutput;
import java.io.IOException;
import java.util.List;
import java.util.Set;
@ -135,8 +137,9 @@ public class SqlResource
try {
Thread.currentThread().setName(StringUtils.format("sql[%s]", sqlQueryId));
final Sequence<Object[]> sequence = stmt.execute();
final SqlRowTransformer rowTransformer = stmt.createRowTransformer();
ResultSet resultSet = stmt.plan();
final Sequence<Object[]> sequence = resultSet.run();
final SqlRowTransformer rowTransformer = resultSet.createRowTransformer();
final Yielder<Object[]> yielder0 = Yielders.each(sequence);
try {

View File

@ -26,6 +26,7 @@ import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.guava.LazySequence;
import org.apache.druid.java.util.common.guava.Sequence;
@ -45,6 +46,7 @@ import org.apache.druid.server.scheduling.ManualQueryPrioritizationStrategy;
import org.apache.druid.server.security.AuthConfig;
import org.apache.druid.server.security.AuthenticationResult;
import org.apache.druid.server.security.ForbiddenException;
import org.apache.druid.sql.DirectStatement.ResultSet;
import org.apache.druid.sql.SqlPlanningException.PlanningError;
import org.apache.druid.sql.calcite.planner.CalciteRulesManager;
import org.apache.druid.sql.calcite.planner.DruidOperatorTable;
@ -68,6 +70,7 @@ import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import javax.servlet.http.HttpServletRequest;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
@ -75,6 +78,9 @@ import java.util.Map;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
public class SqlStatementTest
@ -213,10 +219,53 @@ public class SqlStatementTest
"SELECT COUNT(*) AS cnt, 'foo' AS TheFoo FROM druid.foo",
CalciteTests.REGULAR_USER_AUTH_RESULT);
DirectStatement stmt = sqlStatementFactory.directStatement(sqlReq);
List<Object[]> results = stmt.execute().toList();
ResultSet resultSet = stmt.plan();
assertTrue(resultSet.runnable());
List<Object[]> results = resultSet.run().toList();
assertEquals(1, results.size());
assertEquals(6L, results.get(0)[0]);
assertEquals("foo", results.get(0)[1]);
assertSame(stmt.reporter(), resultSet.reporter());
assertSame(stmt.resources(), resultSet.resources());
assertSame(stmt.query(), resultSet.query());
assertFalse(resultSet.runnable());
resultSet.close();
stmt.close();
}
@Test
public void testDirectPlanTwice()
{
SqlQueryPlus sqlReq = queryPlus(
"SELECT COUNT(*) AS cnt, 'foo' AS TheFoo FROM druid.foo",
CalciteTests.REGULAR_USER_AUTH_RESULT);
DirectStatement stmt = sqlStatementFactory.directStatement(sqlReq);
stmt.plan();
try {
stmt.plan();
fail();
}
catch (ISE e) {
stmt.closeWithError(e);
}
}
@Test
public void testDirectExecTwice()
{
SqlQueryPlus sqlReq = queryPlus(
"SELECT COUNT(*) AS cnt, 'foo' AS TheFoo FROM druid.foo",
CalciteTests.REGULAR_USER_AUTH_RESULT);
DirectStatement stmt = sqlStatementFactory.directStatement(sqlReq);
ResultSet resultSet = stmt.plan();
resultSet.run();
try {
resultSet.run();
fail();
}
catch (ISE e) {
stmt.closeWithError(e);
}
}
@Test
@ -352,7 +401,7 @@ public class SqlStatementTest
// Prepared statements: using a prepare/execute model.
@Test
public void testJdbcHappyPath()
public void testPreparedHappyPath()
{
SqlQueryPlus sqlReq = queryPlus(
"SELECT COUNT(*) AS cnt, 'foo' AS TheFoo FROM druid.foo",
@ -381,7 +430,7 @@ public class SqlStatementTest
}
@Test
public void testJdbcSyntaxError()
public void testPrepareSyntaxError()
{
SqlQueryPlus sqlReq = queryPlus(
"SELECT COUNT(*) AS cnt, 'foo' AS",
@ -398,7 +447,7 @@ public class SqlStatementTest
}
@Test
public void testJdbcValidationError()
public void testPrepareValidationError()
{
SqlQueryPlus sqlReq = queryPlus(
"SELECT COUNT(*) AS cnt, 'foo' AS TheFoo FROM druid.bogus",
@ -415,7 +464,7 @@ public class SqlStatementTest
}
@Test
public void testJdbcPermissionError()
public void testPreparePermissionError()
{
SqlQueryPlus sqlReq = queryPlus(
"select count(*) from forbiddenDatasource",
@ -442,7 +491,7 @@ public class SqlStatementTest
.auth(CalciteTests.REGULAR_USER_AUTH_RESULT)
.build();
DirectStatement stmt = sqlStatementFactory.directStatement(sqlReq);
Map<String, Object> context = stmt.sqlRequest().context().getMergedParams();
Map<String, Object> context = stmt.query().context().getMergedParams();
Assert.assertEquals(2, context.size());
// should contain only query id, not bySegment since it is not valid for SQL
Assert.assertTrue(context.containsKey(PlannerContext.CTX_SQL_QUERY_ID));
@ -457,7 +506,7 @@ public class SqlStatementTest
.auth(CalciteTests.REGULAR_USER_AUTH_RESULT)
.build();
DirectStatement stmt = sqlStatementFactory.directStatement(sqlReq);
Map<String, Object> context = stmt.sqlRequest().context().getMergedParams();
Map<String, Object> context = stmt.query().context().getMergedParams();
Assert.assertEquals(2, context.size());
// Statement should contain default query context values
for (String defaultContextKey : defaultQueryConfig.getContext().keySet()) {

View File

@ -88,6 +88,9 @@ import java.util.stream.Collectors;
public class SegmentMetadataCacheTest extends SegmentMetadataCacheCommon
{
// Timeout to allow (rapid) debugging, while not blocking tests with errors.
private static final int WAIT_TIMEOUT_SECS = 60;
private SpecificSegmentsQuerySegmentWalker walker;
private TestServerInventoryView serverView;
private List<ImmutableDruidServer> druidServers;
@ -931,11 +934,11 @@ public class SegmentMetadataCacheTest extends SegmentMetadataCacheCommon
segmentDataSourceNames.add("foo");
joinableDataSourceNames.add("foo");
serverView.addSegment(someNewBrokerSegment, ServerType.BROKER);
Assert.assertTrue(markDataSourceLatch.await(2, TimeUnit.SECONDS));
Assert.assertTrue(markDataSourceLatch.await(WAIT_TIMEOUT_SECS, TimeUnit.SECONDS));
// wait for build twice
Assert.assertTrue(buildTableLatch.await(2, TimeUnit.SECONDS));
Assert.assertTrue(buildTableLatch.await(WAIT_TIMEOUT_SECS, TimeUnit.SECONDS));
// wait for get again, just to make sure table has been updated (latch counts down just before tables are updated)
Assert.assertTrue(getDatasourcesLatch.await(2, TimeUnit.SECONDS));
Assert.assertTrue(getDatasourcesLatch.await(WAIT_TIMEOUT_SECS, TimeUnit.SECONDS));
fooTable = schema.getDatasource("foo");
Assert.assertNotNull(fooTable);
@ -952,11 +955,11 @@ public class SegmentMetadataCacheTest extends SegmentMetadataCacheCommon
segmentDataSourceNames.remove("foo");
serverView.removeSegment(someNewBrokerSegment, ServerType.BROKER);
Assert.assertTrue(markDataSourceLatch.await(2, TimeUnit.SECONDS));
Assert.assertTrue(markDataSourceLatch.await(WAIT_TIMEOUT_SECS, TimeUnit.SECONDS));
// wait for build
Assert.assertTrue(buildTableLatch.await(2, TimeUnit.SECONDS));
Assert.assertTrue(buildTableLatch.await(WAIT_TIMEOUT_SECS, TimeUnit.SECONDS));
// wait for get again, just to make sure table has been updated (latch counts down just before tables are updated)
Assert.assertTrue(getDatasourcesLatch.await(2, TimeUnit.SECONDS));
Assert.assertTrue(getDatasourcesLatch.await(WAIT_TIMEOUT_SECS, TimeUnit.SECONDS));
fooTable = schema.getDatasource("foo");
Assert.assertNotNull(fooTable);
@ -996,10 +999,10 @@ public class SegmentMetadataCacheTest extends SegmentMetadataCacheCommon
segmentDataSourceNames.add("foo");
serverView.addSegment(someNewBrokerSegment, ServerType.BROKER);
Assert.assertTrue(markDataSourceLatch.await(2, TimeUnit.SECONDS));
Assert.assertTrue(buildTableLatch.await(2, TimeUnit.SECONDS));
Assert.assertTrue(markDataSourceLatch.await(WAIT_TIMEOUT_SECS, TimeUnit.SECONDS));
Assert.assertTrue(buildTableLatch.await(WAIT_TIMEOUT_SECS, TimeUnit.SECONDS));
// wait for get again, just to make sure table has been updated (latch counts down just before tables are updated)
Assert.assertTrue(getDatasourcesLatch.await(2, TimeUnit.SECONDS));
Assert.assertTrue(getDatasourcesLatch.await(WAIT_TIMEOUT_SECS, TimeUnit.SECONDS));
fooTable = schema.getDatasource("foo");
Assert.assertNotNull(fooTable);
@ -1017,11 +1020,11 @@ public class SegmentMetadataCacheTest extends SegmentMetadataCacheCommon
segmentDataSourceNames.remove("foo");
serverView.removeSegment(someNewBrokerSegment, ServerType.BROKER);
Assert.assertTrue(markDataSourceLatch.await(2, TimeUnit.SECONDS));
Assert.assertTrue(markDataSourceLatch.await(WAIT_TIMEOUT_SECS, TimeUnit.SECONDS));
// wait for build
Assert.assertTrue(buildTableLatch.await(2, TimeUnit.SECONDS));
Assert.assertTrue(buildTableLatch.await(WAIT_TIMEOUT_SECS, TimeUnit.SECONDS));
// wait for get again, just to make sure table has been updated (latch counts down just before tables are updated)
Assert.assertTrue(getDatasourcesLatch.await(2, TimeUnit.SECONDS));
Assert.assertTrue(getDatasourcesLatch.await(WAIT_TIMEOUT_SECS, TimeUnit.SECONDS));
fooTable = schema.getDatasource("foo");
Assert.assertNotNull(fooTable);

View File

@ -110,6 +110,7 @@ import javax.ws.rs.core.MultivaluedMap;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
import javax.ws.rs.core.StreamingOutput;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
@ -133,7 +134,8 @@ public class SqlResourceTest extends CalciteTestBase
{
private static final ObjectMapper JSON_MAPPER = new DefaultObjectMapper();
private static final String DUMMY_SQL_QUERY_ID = "dummy";
private static final int WAIT_TIMEOUT_SECS = 3;
// Timeout to allow (rapid) debugging, while not blocking tests with errors.
private static final int WAIT_TIMEOUT_SECS = 60;
private static final Consumer<DirectStatement> NULL_ACTION = s -> {};
private static final List<String> EXPECTED_COLUMNS_FOR_RESULT_FORMAT_TESTS =
@ -1917,11 +1919,11 @@ public class SqlResourceTest extends CalciteTestBase
}
@Override
public PlannerResult plan(DruidPlanner planner)
public PlannerResult createPlan(DruidPlanner planner)
{
if (planLatchSupplier.get() != null) {
if (planLatchSupplier.get().rhs) {
PlannerResult result = super.plan(planner);
PlannerResult result = super.createPlan(planner);
planLatchSupplier.get().lhs.countDown();
return result;
} else {
@ -1933,45 +1935,52 @@ public class SqlResourceTest extends CalciteTestBase
catch (InterruptedException e) {
throw new RuntimeException(e);
}
return super.plan(planner);
return super.createPlan(planner);
}
} else {
return super.plan(planner);
return super.createPlan(planner);
}
}
@Override
public Sequence<Object[]> execute()
public ResultSet plan()
{
onExecute.accept(this);
return super.execute();
return super.plan();
}
@Override
public Sequence<Object[]> doExecute()
public ResultSet createResultSet(PlannerResult plannerResult)
{
final Function<Sequence<Object[]>, Sequence<Object[]>> sequenceMapFn =
Optional.ofNullable(sequenceMapFnSupplier.get()).orElse(Function.identity());
return new ResultSet(plannerResult)
{
@Override
public Sequence<Object[]> run()
{
final Function<Sequence<Object[]>, Sequence<Object[]>> sequenceMapFn =
Optional.ofNullable(sequenceMapFnSupplier.get()).orElse(Function.identity());
if (executeLatchSupplier.get() != null) {
if (executeLatchSupplier.get().rhs) {
Sequence<Object[]> sequence = sequenceMapFn.apply(super.doExecute());
executeLatchSupplier.get().lhs.countDown();
return sequence;
} else {
try {
if (!executeLatchSupplier.get().lhs.await(WAIT_TIMEOUT_SECS, TimeUnit.SECONDS)) {
throw new RuntimeException("Latch timed out");
if (executeLatchSupplier.get() != null) {
if (executeLatchSupplier.get().rhs) {
Sequence<Object[]> sequence = sequenceMapFn.apply(super.run());
executeLatchSupplier.get().lhs.countDown();
return sequence;
} else {
try {
if (!executeLatchSupplier.get().lhs.await(WAIT_TIMEOUT_SECS, TimeUnit.SECONDS)) {
throw new RuntimeException("Latch timed out");
}
}
catch (InterruptedException e) {
throw new RuntimeException(e);
}
return sequenceMapFn.apply(super.run());
}
} else {
return sequenceMapFn.apply(super.run());
}
catch (InterruptedException e) {
throw new RuntimeException(e);
}
return sequenceMapFn.apply(super.doExecute());
}
} else {
return sequenceMapFn.apply(super.doExecute());
}
};
}
}
}