extend/cleanup/etc

This commit is contained in:
Zoltan Haindrich 2024-08-05 13:41:53 +00:00
parent bc70443c7f
commit f4af51ef7f
2 changed files with 37 additions and 52 deletions

View File

@ -20,26 +20,26 @@
package org.apache.druid.msq.test;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.inject.Injector;
import com.google.inject.Module;
import org.apache.calcite.rel.RelRoot;
import org.apache.druid.guice.DruidInjectorBuilder;
import org.apache.druid.msq.exec.WorkerMemoryParameters;
import org.apache.druid.msq.exec.TestMSQSqlModule;
import org.apache.druid.msq.sql.MSQTaskSqlEngine;
import org.apache.druid.query.groupby.TestGroupByBuffers;
import org.apache.druid.server.QueryLifecycleFactory;
import org.apache.druid.sql.calcite.BaseCalciteQueryTest;
import org.apache.druid.sql.calcite.CalciteJoinQueryTest;
import org.apache.druid.sql.calcite.QueryTestBuilder;
import org.apache.druid.sql.calcite.SqlTestFrameworkConfig;
import org.apache.druid.sql.calcite.TempDirProducer;
import org.apache.druid.sql.calcite.planner.JoinAlgorithm;
import org.apache.druid.sql.calcite.planner.PlannerContext;
import org.apache.druid.sql.calcite.run.EngineFeature;
import org.apache.druid.sql.calcite.run.QueryMaker;
import org.apache.druid.sql.calcite.run.SqlEngine;
import org.apache.druid.sql.calcite.util.SqlTestFramework.StandardComponentSupplier;
import java.util.Map;
/**
* Runs {@link CalciteJoinQueryTest} but with MSQ engine.
*/
@ -48,7 +48,7 @@ public class CalciteSelectJoinQueryMSQTest
/**
* Run all tests with {@link JoinAlgorithm#BROADCAST}.
*/
@SqlTestFrameworkConfig.ComponentSupplier(BroadcastJoinComponentSupplier.class)
@SqlTestFrameworkConfig.ComponentSupplier(JoinComponentSupplier.class)
public static class BroadcastTest extends Base
{
@Override
@ -57,12 +57,18 @@ public class CalciteSelectJoinQueryMSQTest
return super.testBuilder()
.verifyNativeQueries(new VerifyMSQSupportedNativeQueriesPredicate());
}
@Override
protected JoinAlgorithm joinAlgorithm()
{
return JoinAlgorithm.BROADCAST;
}
}
/**
* Run all tests with {@link JoinAlgorithm#SORT_MERGE}.
*/
@SqlTestFrameworkConfig.ComponentSupplier(SortMergeJoinComponentSupplier.class)
@SqlTestFrameworkConfig.ComponentSupplier(JoinComponentSupplier.class)
public static class SortMergeTest extends Base
{
@Override
@ -79,14 +85,27 @@ public class CalciteSelectJoinQueryMSQTest
return super.testBuilder()
.verifyNativeQueries(xs -> false);
}
@Override
protected JoinAlgorithm joinAlgorithm()
{
return JoinAlgorithm.SORT_MERGE;
}
}
public abstract static class Base extends CalciteJoinQueryTest
{
protected abstract JoinAlgorithm joinAlgorithm();
@Override
protected QueryTestBuilder testBuilder()
{
return new QueryTestBuilder(new CalciteTestConfig(true))
Map<String, Object> defaultCtx = ImmutableMap.<String, Object>builder()
.putAll(BaseCalciteQueryTest.QUERY_CONTEXT_DEFAULT)
.put(PlannerContext.CTX_SQL_JOIN_ALGORITHM, joinAlgorithm().toString())
.build();
return new QueryTestBuilder(new CalciteTestConfig(defaultCtx, true))
.addCustomRunner(
new ExtractResultsFactory(
() -> (MSQTestOverlordServiceClient) ((MSQTaskSqlEngine) queryFramework().engine()).overlordClient()))
@ -94,30 +113,11 @@ public class CalciteSelectJoinQueryMSQTest
}
}
protected static class SortMergeJoinComponentSupplier extends AbstractJoinComponentSupplier
public static final class JoinComponentSupplier extends StandardComponentSupplier
{
public SortMergeJoinComponentSupplier(TempDirProducer tempFolderProducer)
{
super(tempFolderProducer, JoinAlgorithm.SORT_MERGE);
}
}
protected static class BroadcastJoinComponentSupplier extends AbstractJoinComponentSupplier
{
public BroadcastJoinComponentSupplier(TempDirProducer tempFolderProducer)
{
super(tempFolderProducer, JoinAlgorithm.BROADCAST);
}
}
protected abstract static class AbstractJoinComponentSupplier extends StandardComponentSupplier
{
private JoinAlgorithm joinAlgorithm;
public AbstractJoinComponentSupplier(TempDirProducer tempFolderProducer, JoinAlgorithm joinAlgorithm)
public JoinComponentSupplier(TempDirProducer tempFolderProducer)
{
super(tempFolderProducer);
this.joinAlgorithm = joinAlgorithm;
}
@Override
@ -127,6 +127,7 @@ public class CalciteSelectJoinQueryMSQTest
builder.addModules(
CalciteMSQTestsHelper.fetchModules(tempDirProducer::newTempFolder, TestGroupByBuffers.createDefault()).toArray(new Module[0])
);
builder.addModule(new TestMSQSqlModule());
}
@Override
@ -136,29 +137,7 @@ public class CalciteSelectJoinQueryMSQTest
Injector injector
)
{
final WorkerMemoryParameters workerMemoryParameters = MSQTestBase.makeTestWorkerMemoryParameters();
final MSQTestOverlordServiceClient indexingServiceClient = new MSQTestOverlordServiceClient(
queryJsonMapper,
injector,
new MSQTestTaskActionClient(queryJsonMapper, injector),
workerMemoryParameters,
ImmutableList.of()
);
return new MSQTaskSqlEngine(indexingServiceClient, queryJsonMapper)
{
@Override
public boolean featureAvailable(EngineFeature feature)
{
return super.featureAvailable(feature);
}
@Override
public QueryMaker buildQueryMakerForSelect(RelRoot relRoot, PlannerContext plannerContext)
{
plannerContext.queryContextMap().put(PlannerContext.CTX_SQL_JOIN_ALGORITHM, joinAlgorithm.toString());
return super.buildQueryMakerForSelect(relRoot, plannerContext);
}
};
return injector.getInstance(MSQTaskSqlEngine.class);
}
}
}

View File

@ -887,6 +887,12 @@ public class BaseCalciteQueryTest extends CalciteTestBase
this.isRunningMSQ = isRunningMSQ;
}
public CalciteTestConfig(Map<String, Object> baseQueryContext, boolean isRunningMSQ)
{
this(baseQueryContext);
this.isRunningMSQ = isRunningMSQ;
}
public CalciteTestConfig(Map<String, Object> baseQueryContext)
{
Preconditions.checkNotNull(baseQueryContext, "baseQueryContext is null");