From eb902375a233266229c6955cbe16833992579605 Mon Sep 17 00:00:00 2001 From: Paul Rogers Date: Thu, 18 Aug 2022 14:01:06 -0700 Subject: [PATCH] Light refactor of the heavily refactored statement classes (#12909) Reflects lessons learned from working with consumers of the new code. --- .../apache/druid/sql/AbstractStatement.java | 30 ++- .../org/apache/druid/sql/DirectStatement.java | 199 +++++++++++++++--- .../org/apache/druid/sql/HttpStatement.java | 5 - .../druid/sql/SqlPlanningException.java | 6 + .../avatica/DruidJdbcPreparedStatement.java | 2 +- .../druid/sql/avatica/DruidJdbcResultSet.java | 2 +- .../sql/calcite/planner/PlannerResult.java | 8 +- .../apache/druid/sql/http/SqlResource.java | 7 +- .../apache/druid/sql/SqlStatementTest.java | 63 +++++- .../schema/SegmentMetadataCacheTest.java | 27 +-- .../druid/sql/http/SqlResourceTest.java | 61 +++--- 11 files changed, 305 insertions(+), 105 deletions(-) diff --git a/sql/src/main/java/org/apache/druid/sql/AbstractStatement.java b/sql/src/main/java/org/apache/druid/sql/AbstractStatement.java index 2a001b7e509..99605918e23 100644 --- a/sql/src/main/java/org/apache/druid/sql/AbstractStatement.java +++ b/sql/src/main/java/org/apache/druid/sql/AbstractStatement.java @@ -30,7 +30,6 @@ import org.apache.druid.server.security.ForbiddenException; import org.apache.druid.server.security.ResourceAction; import org.apache.druid.sql.calcite.planner.DruidPlanner; import org.apache.druid.sql.calcite.planner.PlannerContext; -import org.apache.druid.sql.calcite.planner.PlannerResult; import java.io.Closeable; import java.util.Set; @@ -160,20 +159,6 @@ public abstract class AbstractStatement implements Closeable ); } - /** - * Plan the query, which also produces the sequence that runs - * the query. - */ - protected PlannerResult plan(DruidPlanner planner) - { - try { - return planner.plan(); - } - catch (ValidationException e) { - throw new SqlPlanningException(e); - } - } - /** * Return the datasource and table resources for this * statement. @@ -188,7 +173,7 @@ public abstract class AbstractStatement implements Closeable return fullResourceActions; } - public SqlQueryPlus sqlRequest() + public SqlQueryPlus query() { return queryPlus; } @@ -220,4 +205,17 @@ public abstract class AbstractStatement implements Closeable public void closeQuietly() { } + + /** + * Convenience method to close the statement and report an error + * associated with the statement. Same as calling:{@code + * stmt.reporter().failed(e); + * stmt.close(); + * } + */ + public void closeWithError(Throwable e) + { + reporter.failed(e); + close(); + } } diff --git a/sql/src/main/java/org/apache/druid/sql/DirectStatement.java b/sql/src/main/java/org/apache/druid/sql/DirectStatement.java index a8931bf73da..03318ed8022 100644 --- a/sql/src/main/java/org/apache/druid/sql/DirectStatement.java +++ b/sql/src/main/java/org/apache/druid/sql/DirectStatement.java @@ -19,15 +19,20 @@ package org.apache.druid.sql; +import com.google.common.annotations.VisibleForTesting; +import org.apache.calcite.tools.ValidationException; +import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.query.QueryInterruptedException; +import org.apache.druid.server.security.ResourceAction; import org.apache.druid.sql.SqlLifecycleManager.Cancelable; import org.apache.druid.sql.calcite.planner.DruidPlanner; import org.apache.druid.sql.calcite.planner.PlannerResult; import org.apache.druid.sql.calcite.planner.PrepareResult; +import java.util.Set; import java.util.concurrent.CopyOnWriteArrayList; /** @@ -62,9 +67,91 @@ public class DirectStatement extends AbstractStatement implements Cancelable { private static final Logger log = new Logger(DirectStatement.class); + /** + * Represents the execution plan for a query with the ability to run + * that plan (once). + */ + public class ResultSet implements Cancelable + { + private final PlannerResult plannerResult; + + public ResultSet(PlannerResult plannerResult) + { + this.plannerResult = plannerResult; + } + + public SqlQueryPlus query() + { + return queryPlus; + } + + /** + * Convenience method for the split plan/run case to ensure that the statement + * can, in fact, be run. + */ + public boolean runnable() + { + return plannerResult != null && plannerResult.runnable(); + } + + /** + * Do the actual execute step which allows subclasses to wrap the sequence, + * as is sometimes needed for testing. + */ + public Sequence run() + { + try { + // Check cancellation. Required for SqlResourceTest to work. + transition(State.RAN); + return plannerResult.run(); + } + catch (RuntimeException e) { + reporter.failed(e); + throw e; + } + } + + public SqlRowTransformer createRowTransformer() + { + return new SqlRowTransformer(plannerContext.getTimeZone(), plannerResult.rowType()); + } + + public SqlExecutionReporter reporter() + { + return reporter; + } + + @Override + public Set resources() + { + return DirectStatement.this.resources(); + } + + @Override + public void cancel() + { + DirectStatement.this.cancel(); + } + + public void close() + { + DirectStatement.this.close(); + } + } + + private enum State + { + START, + PREPARED, + RAN, + CANCELLED, + FAILED, + CLOSED + } + protected PrepareResult prepareResult; - protected PlannerResult plannerResult; - private volatile boolean canceled; + protected ResultSet resultSet; + private volatile State state = State.START; public DirectStatement( final SqlToolbox lifecycleToolbox, @@ -84,7 +171,21 @@ public class DirectStatement extends AbstractStatement implements Cancelable } /** - * Direct execution of a query, including: + * Convenience method to perform Direct execution of a query. Does both + * the {@link #plan()} step and the {@link ResultSet#run()} step. + * + * @return sequence which delivers query results + */ + public Sequence execute() + { + return plan().run(); + } + + /** + * Prepares and plans a query for execution, returning a result set to + * execute the query. In Druid, prepare and plan are different: prepare provides + * information about the query, but plan does the "real" preparation to create + * an actual executable plan. *
    *
  • Create the planner.
  • *
  • Parse the statement.
  • @@ -93,16 +194,14 @@ public class DirectStatement extends AbstractStatement implements Cancelable *
  • Validate the query against the Druid catalog.
  • *
  • Authorize access to the resources which the query needs.
  • *
  • Plan the query.
  • - *
  • Return a {@link Sequence} which executes the query and returns results.
  • *
- * - * This method is called from the request thread; results are read in the - * response thread. - * - * @return sequence which delivers query results + * Call {@link ResultSet#run()} to run the resulting plan. */ - public Sequence execute() + public ResultSet plan() { + if (state != State.START) { + throw new ISE("Can plan a query only once."); + } try (DruidPlanner planner = sqlToolbox.plannerFactory.createPlanner( sqlToolbox.engine, queryPlus.sql(), @@ -114,49 +213,59 @@ public class DirectStatement extends AbstractStatement implements Cancelable // Tests cancel during this call; real clients might do so if the plan // or execution prep stages take too long for some unexpected reason. sqlToolbox.sqlLifecycleManager.add(sqlQueryId(), this); - checkCanceled(); - plannerResult = plan(planner); + transition(State.PREPARED); + resultSet = createResultSet(createPlan(planner)); prepareResult = planner.prepareResult(); - return doExecute(); + // Double check needed by SqlResourceTest + transition(State.PREPARED); + return resultSet; } catch (RuntimeException e) { + state = State.FAILED; reporter.failed(e); throw e; } } + /** + * Plan the query, which also produces the sequence that runs + * the query. + */ + @VisibleForTesting + protected PlannerResult createPlan(DruidPlanner planner) + { + try { + return planner.plan(); + } + catch (ValidationException e) { + throw new SqlPlanningException(e); + } + } + + /** + * Wrapper around result set creation for the sole purpose of tests which + * inject failures. + */ + @VisibleForTesting + protected ResultSet createResultSet(PlannerResult plannerResult) + { + return new ResultSet(plannerResult); + } + public PrepareResult prepareResult() { return prepareResult; } - /** - * Do the actual execute step which allows subclasses to wrap the sequence, - * as is sometimes needed for testing. - */ - protected Sequence doExecute() - { - // Check cancellation here and not in execute() above: - // required for SqlResourceTest to work. - checkCanceled(); - try { - return plannerResult.run(); - } - catch (RuntimeException e) { - reporter.failed(e); - throw e; - } - } - /** * Checks for cancellation. As it turns out, this is really just a test-time * check: an actual client can't cancel the query until the query reports * a query ID, which won't happen until after the {@link #execute())} * call. */ - private void checkCanceled() + private void transition(State newState) { - if (canceled) { + if (state == State.CANCELLED) { throw new QueryInterruptedException( QueryInterruptedException.QUERY_CANCELED, StringUtils.format("Query is canceled [%s]", sqlQueryId()), @@ -164,12 +273,16 @@ public class DirectStatement extends AbstractStatement implements Cancelable null ); } + state = newState; } @Override public void cancel() { - canceled = true; + if (state == State.CLOSED) { + return; + } + state = State.CANCELLED; final CopyOnWriteArrayList nativeQueryIds = plannerContext.getNativeQueryIds(); for (String nativeQueryId : nativeQueryIds) { @@ -177,4 +290,22 @@ public class DirectStatement extends AbstractStatement implements Cancelable sqlToolbox.queryScheduler.cancelQuery(nativeQueryId); } } + + @Override + public void close() + { + if (state != State.START && state != State.CLOSED) { + super.close(); + state = State.CLOSED; + } + } + + @Override + public void closeWithError(Throwable e) + { + if (state != State.START && state != State.CLOSED) { + super.closeWithError(e); + state = State.CLOSED; + } + } } diff --git a/sql/src/main/java/org/apache/druid/sql/HttpStatement.java b/sql/src/main/java/org/apache/druid/sql/HttpStatement.java index 31abd15a427..52bef0a04f0 100644 --- a/sql/src/main/java/org/apache/druid/sql/HttpStatement.java +++ b/sql/src/main/java/org/apache/druid/sql/HttpStatement.java @@ -68,9 +68,4 @@ public class HttpStatement extends DirectStatement sqlToolbox.plannerFactory.getAuthorizerMapper() ); } - - public SqlRowTransformer createRowTransformer() - { - return new SqlRowTransformer(plannerContext.getTimeZone(), plannerResult.rowType()); - } } diff --git a/sql/src/main/java/org/apache/druid/sql/SqlPlanningException.java b/sql/src/main/java/org/apache/druid/sql/SqlPlanningException.java index a08ad53674a..fb4e4f439b1 100644 --- a/sql/src/main/java/org/apache/druid/sql/SqlPlanningException.java +++ b/sql/src/main/java/org/apache/druid/sql/SqlPlanningException.java @@ -22,6 +22,7 @@ package org.apache.druid.sql; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import org.apache.calcite.plan.RelOptPlanner; +import org.apache.calcite.runtime.CalciteContextException; import org.apache.calcite.sql.parser.SqlParseException; import org.apache.calcite.tools.ValidationException; import org.apache.druid.query.BadQueryException; @@ -67,6 +68,11 @@ public class SqlPlanningException extends BadQueryException this(PlanningError.VALIDATION_ERROR, e.getMessage()); } + public SqlPlanningException(CalciteContextException e) + { + this(PlanningError.VALIDATION_ERROR, e.getMessage()); + } + public SqlPlanningException(PlanningError planningError, String errorMessage) { this(planningError.errorCode, errorMessage, planningError.errorClass); diff --git a/sql/src/main/java/org/apache/druid/sql/avatica/DruidJdbcPreparedStatement.java b/sql/src/main/java/org/apache/druid/sql/avatica/DruidJdbcPreparedStatement.java index 428dcbf6ef6..3cd608addb7 100644 --- a/sql/src/main/java/org/apache/druid/sql/avatica/DruidJdbcPreparedStatement.java +++ b/sql/src/main/java/org/apache/druid/sql/avatica/DruidJdbcPreparedStatement.java @@ -65,7 +65,7 @@ public class DruidJdbcPreparedStatement extends AbstractDruidJdbcStatement PrepareResult prepareResult = sqlStatement.prepare(); signature = createSignature( prepareResult, - sqlStatement.sqlRequest().sql() + sqlStatement.query().sql() ); state = State.PREPARED; } diff --git a/sql/src/main/java/org/apache/druid/sql/avatica/DruidJdbcResultSet.java b/sql/src/main/java/org/apache/druid/sql/avatica/DruidJdbcResultSet.java index 15a5e36770d..36a69dd8154 100644 --- a/sql/src/main/java/org/apache/druid/sql/avatica/DruidJdbcResultSet.java +++ b/sql/src/main/java/org/apache/druid/sql/avatica/DruidJdbcResultSet.java @@ -114,7 +114,7 @@ public class DruidJdbcResultSet implements Closeable yielder = Yielders.each(retSequence); signature = AbstractDruidJdbcStatement.createSignature( stmt.prepareResult(), - stmt.sqlRequest().sql() + stmt.query().sql() ); } catch (ExecutionException e) { diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerResult.java b/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerResult.java index c571c42ab04..619f6c5509c 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerResult.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerResult.java @@ -21,6 +21,7 @@ package org.apache.druid.sql.calcite.planner; import com.google.common.base.Supplier; import org.apache.calcite.rel.type.RelDataType; +import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.guava.Sequence; import java.util.concurrent.atomic.AtomicBoolean; @@ -44,6 +45,11 @@ public class PlannerResult this.rowType = rowType; } + public boolean runnable() + { + return !didRun.get(); + } + /** * Run the query */ @@ -51,7 +57,7 @@ public class PlannerResult { if (!didRun.compareAndSet(false, true)) { // Safety check. - throw new IllegalStateException("Cannot run more than once"); + throw new ISE("Cannot run more than once"); } return resultsSupplier.get(); } diff --git a/sql/src/main/java/org/apache/druid/sql/http/SqlResource.java b/sql/src/main/java/org/apache/druid/sql/http/SqlResource.java index 6a3787b7ea0..28623a622e6 100644 --- a/sql/src/main/java/org/apache/druid/sql/http/SqlResource.java +++ b/sql/src/main/java/org/apache/druid/sql/http/SqlResource.java @@ -44,6 +44,7 @@ import org.apache.druid.server.security.AuthorizationUtils; import org.apache.druid.server.security.AuthorizerMapper; import org.apache.druid.server.security.ForbiddenException; import org.apache.druid.server.security.ResourceAction; +import org.apache.druid.sql.DirectStatement.ResultSet; import org.apache.druid.sql.HttpStatement; import org.apache.druid.sql.SqlExecutionReporter; import org.apache.druid.sql.SqlLifecycleManager; @@ -67,6 +68,7 @@ import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; import javax.ws.rs.core.Response.Status; import javax.ws.rs.core.StreamingOutput; + import java.io.IOException; import java.util.List; import java.util.Set; @@ -135,8 +137,9 @@ public class SqlResource try { Thread.currentThread().setName(StringUtils.format("sql[%s]", sqlQueryId)); - final Sequence sequence = stmt.execute(); - final SqlRowTransformer rowTransformer = stmt.createRowTransformer(); + ResultSet resultSet = stmt.plan(); + final Sequence sequence = resultSet.run(); + final SqlRowTransformer rowTransformer = resultSet.createRowTransformer(); final Yielder yielder0 = Yielders.each(sequence); try { diff --git a/sql/src/test/java/org/apache/druid/sql/SqlStatementTest.java b/sql/src/test/java/org/apache/druid/sql/SqlStatementTest.java index 41f9129032c..2004fe9988b 100644 --- a/sql/src/test/java/org/apache/druid/sql/SqlStatementTest.java +++ b/sql/src/test/java/org/apache/druid/sql/SqlStatementTest.java @@ -26,6 +26,7 @@ import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rel.type.RelDataTypeField; +import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.guava.LazySequence; import org.apache.druid.java.util.common.guava.Sequence; @@ -45,6 +46,7 @@ import org.apache.druid.server.scheduling.ManualQueryPrioritizationStrategy; import org.apache.druid.server.security.AuthConfig; import org.apache.druid.server.security.AuthenticationResult; import org.apache.druid.server.security.ForbiddenException; +import org.apache.druid.sql.DirectStatement.ResultSet; import org.apache.druid.sql.SqlPlanningException.PlanningError; import org.apache.druid.sql.calcite.planner.CalciteRulesManager; import org.apache.druid.sql.calcite.planner.DruidOperatorTable; @@ -68,6 +70,7 @@ import org.junit.Test; import org.junit.rules.TemporaryFolder; import javax.servlet.http.HttpServletRequest; + import java.io.IOException; import java.util.Collections; import java.util.List; @@ -75,6 +78,9 @@ import java.util.Map; import java.util.concurrent.TimeUnit; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; public class SqlStatementTest @@ -213,10 +219,53 @@ public class SqlStatementTest "SELECT COUNT(*) AS cnt, 'foo' AS TheFoo FROM druid.foo", CalciteTests.REGULAR_USER_AUTH_RESULT); DirectStatement stmt = sqlStatementFactory.directStatement(sqlReq); - List results = stmt.execute().toList(); + ResultSet resultSet = stmt.plan(); + assertTrue(resultSet.runnable()); + List results = resultSet.run().toList(); assertEquals(1, results.size()); assertEquals(6L, results.get(0)[0]); assertEquals("foo", results.get(0)[1]); + assertSame(stmt.reporter(), resultSet.reporter()); + assertSame(stmt.resources(), resultSet.resources()); + assertSame(stmt.query(), resultSet.query()); + assertFalse(resultSet.runnable()); + resultSet.close(); + stmt.close(); + } + + @Test + public void testDirectPlanTwice() + { + SqlQueryPlus sqlReq = queryPlus( + "SELECT COUNT(*) AS cnt, 'foo' AS TheFoo FROM druid.foo", + CalciteTests.REGULAR_USER_AUTH_RESULT); + DirectStatement stmt = sqlStatementFactory.directStatement(sqlReq); + stmt.plan(); + try { + stmt.plan(); + fail(); + } + catch (ISE e) { + stmt.closeWithError(e); + } + } + + @Test + public void testDirectExecTwice() + { + SqlQueryPlus sqlReq = queryPlus( + "SELECT COUNT(*) AS cnt, 'foo' AS TheFoo FROM druid.foo", + CalciteTests.REGULAR_USER_AUTH_RESULT); + DirectStatement stmt = sqlStatementFactory.directStatement(sqlReq); + ResultSet resultSet = stmt.plan(); + resultSet.run(); + try { + resultSet.run(); + fail(); + } + catch (ISE e) { + stmt.closeWithError(e); + } } @Test @@ -352,7 +401,7 @@ public class SqlStatementTest // Prepared statements: using a prepare/execute model. @Test - public void testJdbcHappyPath() + public void testPreparedHappyPath() { SqlQueryPlus sqlReq = queryPlus( "SELECT COUNT(*) AS cnt, 'foo' AS TheFoo FROM druid.foo", @@ -381,7 +430,7 @@ public class SqlStatementTest } @Test - public void testJdbcSyntaxError() + public void testPrepareSyntaxError() { SqlQueryPlus sqlReq = queryPlus( "SELECT COUNT(*) AS cnt, 'foo' AS", @@ -398,7 +447,7 @@ public class SqlStatementTest } @Test - public void testJdbcValidationError() + public void testPrepareValidationError() { SqlQueryPlus sqlReq = queryPlus( "SELECT COUNT(*) AS cnt, 'foo' AS TheFoo FROM druid.bogus", @@ -415,7 +464,7 @@ public class SqlStatementTest } @Test - public void testJdbcPermissionError() + public void testPreparePermissionError() { SqlQueryPlus sqlReq = queryPlus( "select count(*) from forbiddenDatasource", @@ -442,7 +491,7 @@ public class SqlStatementTest .auth(CalciteTests.REGULAR_USER_AUTH_RESULT) .build(); DirectStatement stmt = sqlStatementFactory.directStatement(sqlReq); - Map context = stmt.sqlRequest().context().getMergedParams(); + Map context = stmt.query().context().getMergedParams(); Assert.assertEquals(2, context.size()); // should contain only query id, not bySegment since it is not valid for SQL Assert.assertTrue(context.containsKey(PlannerContext.CTX_SQL_QUERY_ID)); @@ -457,7 +506,7 @@ public class SqlStatementTest .auth(CalciteTests.REGULAR_USER_AUTH_RESULT) .build(); DirectStatement stmt = sqlStatementFactory.directStatement(sqlReq); - Map context = stmt.sqlRequest().context().getMergedParams(); + Map context = stmt.query().context().getMergedParams(); Assert.assertEquals(2, context.size()); // Statement should contain default query context values for (String defaultContextKey : defaultQueryConfig.getContext().keySet()) { diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SegmentMetadataCacheTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SegmentMetadataCacheTest.java index 39029fa9a48..99caee81253 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SegmentMetadataCacheTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SegmentMetadataCacheTest.java @@ -88,6 +88,9 @@ import java.util.stream.Collectors; public class SegmentMetadataCacheTest extends SegmentMetadataCacheCommon { + // Timeout to allow (rapid) debugging, while not blocking tests with errors. + private static final int WAIT_TIMEOUT_SECS = 60; + private SpecificSegmentsQuerySegmentWalker walker; private TestServerInventoryView serverView; private List druidServers; @@ -931,11 +934,11 @@ public class SegmentMetadataCacheTest extends SegmentMetadataCacheCommon segmentDataSourceNames.add("foo"); joinableDataSourceNames.add("foo"); serverView.addSegment(someNewBrokerSegment, ServerType.BROKER); - Assert.assertTrue(markDataSourceLatch.await(2, TimeUnit.SECONDS)); + Assert.assertTrue(markDataSourceLatch.await(WAIT_TIMEOUT_SECS, TimeUnit.SECONDS)); // wait for build twice - Assert.assertTrue(buildTableLatch.await(2, TimeUnit.SECONDS)); + Assert.assertTrue(buildTableLatch.await(WAIT_TIMEOUT_SECS, TimeUnit.SECONDS)); // wait for get again, just to make sure table has been updated (latch counts down just before tables are updated) - Assert.assertTrue(getDatasourcesLatch.await(2, TimeUnit.SECONDS)); + Assert.assertTrue(getDatasourcesLatch.await(WAIT_TIMEOUT_SECS, TimeUnit.SECONDS)); fooTable = schema.getDatasource("foo"); Assert.assertNotNull(fooTable); @@ -952,11 +955,11 @@ public class SegmentMetadataCacheTest extends SegmentMetadataCacheCommon segmentDataSourceNames.remove("foo"); serverView.removeSegment(someNewBrokerSegment, ServerType.BROKER); - Assert.assertTrue(markDataSourceLatch.await(2, TimeUnit.SECONDS)); + Assert.assertTrue(markDataSourceLatch.await(WAIT_TIMEOUT_SECS, TimeUnit.SECONDS)); // wait for build - Assert.assertTrue(buildTableLatch.await(2, TimeUnit.SECONDS)); + Assert.assertTrue(buildTableLatch.await(WAIT_TIMEOUT_SECS, TimeUnit.SECONDS)); // wait for get again, just to make sure table has been updated (latch counts down just before tables are updated) - Assert.assertTrue(getDatasourcesLatch.await(2, TimeUnit.SECONDS)); + Assert.assertTrue(getDatasourcesLatch.await(WAIT_TIMEOUT_SECS, TimeUnit.SECONDS)); fooTable = schema.getDatasource("foo"); Assert.assertNotNull(fooTable); @@ -996,10 +999,10 @@ public class SegmentMetadataCacheTest extends SegmentMetadataCacheCommon segmentDataSourceNames.add("foo"); serverView.addSegment(someNewBrokerSegment, ServerType.BROKER); - Assert.assertTrue(markDataSourceLatch.await(2, TimeUnit.SECONDS)); - Assert.assertTrue(buildTableLatch.await(2, TimeUnit.SECONDS)); + Assert.assertTrue(markDataSourceLatch.await(WAIT_TIMEOUT_SECS, TimeUnit.SECONDS)); + Assert.assertTrue(buildTableLatch.await(WAIT_TIMEOUT_SECS, TimeUnit.SECONDS)); // wait for get again, just to make sure table has been updated (latch counts down just before tables are updated) - Assert.assertTrue(getDatasourcesLatch.await(2, TimeUnit.SECONDS)); + Assert.assertTrue(getDatasourcesLatch.await(WAIT_TIMEOUT_SECS, TimeUnit.SECONDS)); fooTable = schema.getDatasource("foo"); Assert.assertNotNull(fooTable); @@ -1017,11 +1020,11 @@ public class SegmentMetadataCacheTest extends SegmentMetadataCacheCommon segmentDataSourceNames.remove("foo"); serverView.removeSegment(someNewBrokerSegment, ServerType.BROKER); - Assert.assertTrue(markDataSourceLatch.await(2, TimeUnit.SECONDS)); + Assert.assertTrue(markDataSourceLatch.await(WAIT_TIMEOUT_SECS, TimeUnit.SECONDS)); // wait for build - Assert.assertTrue(buildTableLatch.await(2, TimeUnit.SECONDS)); + Assert.assertTrue(buildTableLatch.await(WAIT_TIMEOUT_SECS, TimeUnit.SECONDS)); // wait for get again, just to make sure table has been updated (latch counts down just before tables are updated) - Assert.assertTrue(getDatasourcesLatch.await(2, TimeUnit.SECONDS)); + Assert.assertTrue(getDatasourcesLatch.await(WAIT_TIMEOUT_SECS, TimeUnit.SECONDS)); fooTable = schema.getDatasource("foo"); Assert.assertNotNull(fooTable); diff --git a/sql/src/test/java/org/apache/druid/sql/http/SqlResourceTest.java b/sql/src/test/java/org/apache/druid/sql/http/SqlResourceTest.java index 6560562da2c..7fe468a2e7f 100644 --- a/sql/src/test/java/org/apache/druid/sql/http/SqlResourceTest.java +++ b/sql/src/test/java/org/apache/druid/sql/http/SqlResourceTest.java @@ -110,6 +110,7 @@ import javax.ws.rs.core.MultivaluedMap; import javax.ws.rs.core.Response; import javax.ws.rs.core.Response.Status; import javax.ws.rs.core.StreamingOutput; + import java.io.ByteArrayOutputStream; import java.io.IOException; import java.nio.charset.StandardCharsets; @@ -133,7 +134,8 @@ public class SqlResourceTest extends CalciteTestBase { private static final ObjectMapper JSON_MAPPER = new DefaultObjectMapper(); private static final String DUMMY_SQL_QUERY_ID = "dummy"; - private static final int WAIT_TIMEOUT_SECS = 3; + // Timeout to allow (rapid) debugging, while not blocking tests with errors. + private static final int WAIT_TIMEOUT_SECS = 60; private static final Consumer NULL_ACTION = s -> {}; private static final List EXPECTED_COLUMNS_FOR_RESULT_FORMAT_TESTS = @@ -1917,11 +1919,11 @@ public class SqlResourceTest extends CalciteTestBase } @Override - public PlannerResult plan(DruidPlanner planner) + public PlannerResult createPlan(DruidPlanner planner) { if (planLatchSupplier.get() != null) { if (planLatchSupplier.get().rhs) { - PlannerResult result = super.plan(planner); + PlannerResult result = super.createPlan(planner); planLatchSupplier.get().lhs.countDown(); return result; } else { @@ -1933,45 +1935,52 @@ public class SqlResourceTest extends CalciteTestBase catch (InterruptedException e) { throw new RuntimeException(e); } - return super.plan(planner); + return super.createPlan(planner); } } else { - return super.plan(planner); + return super.createPlan(planner); } } @Override - public Sequence execute() + public ResultSet plan() { onExecute.accept(this); - return super.execute(); + return super.plan(); } @Override - public Sequence doExecute() + public ResultSet createResultSet(PlannerResult plannerResult) { - final Function, Sequence> sequenceMapFn = - Optional.ofNullable(sequenceMapFnSupplier.get()).orElse(Function.identity()); + return new ResultSet(plannerResult) + { + @Override + public Sequence run() + { + final Function, Sequence> sequenceMapFn = + Optional.ofNullable(sequenceMapFnSupplier.get()).orElse(Function.identity()); - if (executeLatchSupplier.get() != null) { - if (executeLatchSupplier.get().rhs) { - Sequence sequence = sequenceMapFn.apply(super.doExecute()); - executeLatchSupplier.get().lhs.countDown(); - return sequence; - } else { - try { - if (!executeLatchSupplier.get().lhs.await(WAIT_TIMEOUT_SECS, TimeUnit.SECONDS)) { - throw new RuntimeException("Latch timed out"); + if (executeLatchSupplier.get() != null) { + if (executeLatchSupplier.get().rhs) { + Sequence sequence = sequenceMapFn.apply(super.run()); + executeLatchSupplier.get().lhs.countDown(); + return sequence; + } else { + try { + if (!executeLatchSupplier.get().lhs.await(WAIT_TIMEOUT_SECS, TimeUnit.SECONDS)) { + throw new RuntimeException("Latch timed out"); + } + } + catch (InterruptedException e) { + throw new RuntimeException(e); + } + return sequenceMapFn.apply(super.run()); } + } else { + return sequenceMapFn.apply(super.run()); } - catch (InterruptedException e) { - throw new RuntimeException(e); - } - return sequenceMapFn.apply(super.doExecute()); } - } else { - return sequenceMapFn.apply(super.doExecute()); - } + }; } } }