msq-test-0

This commit is contained in:
Zoltan Haindrich 2024-07-17 15:38:50 +00:00
parent 8ada2ff238
commit 42b3086512
3 changed files with 96 additions and 11 deletions

View File

@ -20,23 +20,37 @@
package org.apache.druid.msq.exec;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableList;
import com.google.inject.Injector;
import com.google.inject.Module;
import com.google.inject.Provides;
import org.apache.druid.guice.DruidInjectorBuilder;
import org.apache.druid.guice.LazySingleton;
import org.apache.druid.guice.annotations.NativeQuery;
import org.apache.druid.initialization.ServerInjectorBuilderTest.TestDruidModule;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.msq.exec.MSQDrillWindowQueryTest.DrillWindowQueryMSQComponentSupplier;
import org.apache.druid.msq.guice.MSQSqlModule;
import org.apache.druid.msq.sql.MSQTaskSqlEngine;
import org.apache.druid.msq.test.CalciteMSQTestsHelper;
import org.apache.druid.msq.test.ExtractResultsFactory;
import org.apache.druid.msq.test.MSQTestOverlordServiceClient;
import org.apache.druid.msq.test.MSQTestTaskActionClient;
import org.apache.druid.msq.test.VerifyMSQSupportedNativeQueriesPredicate;
import org.apache.druid.query.DefaultQueryConfig;
import org.apache.druid.query.groupby.TestGroupByBuffers;
import org.apache.druid.server.QueryLifecycleFactory;
import org.apache.druid.server.QueryScheduler;
import org.apache.druid.server.log.RequestLogger;
import org.apache.druid.sql.SqlLifecycleManager;
import org.apache.druid.sql.SqlStatementFactory;
import org.apache.druid.sql.SqlToolbox;
import org.apache.druid.sql.calcite.DrillWindowQueryTest;
import org.apache.druid.sql.calcite.QueryTestBuilder;
import org.apache.druid.sql.calcite.SqlTestFrameworkConfig;
import org.apache.druid.sql.calcite.TempDirProducer;
import org.apache.druid.sql.calcite.planner.PlannerFactory;
import org.apache.druid.sql.calcite.run.SqlEngine;
@SqlTestFrameworkConfig.ComponentSupplier(DrillWindowQueryMSQComponentSupplier.class)
@ -54,6 +68,79 @@ public class MSQDrillWindowQueryTest extends DrillWindowQueryTest
{
super.configureGuice(builder);
builder.addModules(CalciteMSQTestsHelper.fetchModules(tempDirProducer::newTempFolder, TestGroupByBuffers.createDefault()).toArray(new Module[0]));
builder.addModule(new TestMSQSqlModule());
}
/**
* More or less {@link MSQSqlModule} but for tests.
*/
static class TestMSQSqlModule extends TestDruidModule {
@Provides
@LazySingleton
public SqlToolbox makeSqlToolbox(
final PlannerFactory plannerFactory,
final ServiceEmitter emitter,
final RequestLogger requestLogger,
final QueryScheduler queryScheduler,
final Supplier<DefaultQueryConfig> defaultQueryConfig,
final SqlLifecycleManager sqlLifecycleManager
)
{
return new SqlToolbox(
null,
plannerFactory,
emitter,
requestLogger,
queryScheduler,
defaultQueryConfig.get(),
sqlLifecycleManager
);
}
@Provides
@NativeQuery
@LazySingleton
public SqlStatementFactory makeNativeSqlStatementFactory(
final MSQTaskSqlEngine sqlEngine,
SqlToolbox toolbox
)
{
return new SqlStatementFactory(toolbox.withEngine(sqlEngine));
}
@Provides
@LazySingleton
public MSQTaskSqlEngine createEngine(
ObjectMapper queryJsonMapper,
MSQTestOverlordServiceClient indexingServiceClient
)
{
return new MSQTaskSqlEngine(indexingServiceClient, queryJsonMapper);
}
@Provides
@LazySingleton
private MSQTestOverlordServiceClient createOverlordClient(ObjectMapper queryJsonMapper, Injector injector)
{
final WorkerMemoryParameters workerMemoryParameters =
WorkerMemoryParameters.createInstance(
WorkerMemoryParameters.PROCESSING_MINIMUM_BYTES * 50,
2,
10,
2,
0,
0
);
final MSQTestOverlordServiceClient indexingServiceClient = new MSQTestOverlordServiceClient(
queryJsonMapper,
injector,
new MSQTestTaskActionClient(queryJsonMapper, injector),
workerMemoryParameters,
ImmutableList.of()
);
return indexingServiceClient;
}
}
@Override

View File

@ -7,13 +7,12 @@ from wikipedia
where cityName in ('New York', 'Aarhus')
group by 1
order by 1;
+----------+-----+------+
| cityName | cnt | aall |
+----------+-----+------+
| Aarhus | 0 | 1 |
| New York | 7 | 13 |
+----------+-----+------+
(2 rows)
+--------------------------------------------+
| TASK |
+--------------------------------------------+
| query-8f888fed-ca24-4f93-85bc-96ed1cb6b7da |
+--------------------------------------------+
(1 row)
!ok
LogicalSort(sort0=[$0], dir0=[ASC])
@ -26,13 +25,13 @@ LogicalSort(sort0=[$0], dir0=[ASC])
LogicalSort(sort0=[$0], dir0=[ASC])
LogicalAggregate(group=[{0}], cnt=[COUNT($1) FILTER $2], aall=[COUNT()])
LogicalProject(cityName=[$2], channel=[$1], $f3=[IS TRUE(>($17, 0))])
LogicalFilter(condition=[SEARCH($2, Sarg['Aarhus':VARCHAR(8), 'New York':VARCHAR(8)]:VARCHAR(8))])
LogicalFilter(condition=[SEARCH($2, Sarg['Aarhus':VARCHAR, 'New York':VARCHAR]:VARCHAR)])
LogicalTableScan(table=[[druid, wikipedia]])
!logicalPlan
DruidAggregate(group=[{0}], cnt=[COUNT($1) FILTER $2], aall=[COUNT()], druid=[logical])
DruidProject(cityName=[$2], channel=[$1], $f3=[IS TRUE(>($17, 0))], druid=[logical])
DruidFilter(condition=[SEARCH($2, Sarg['Aarhus':VARCHAR(8), 'New York':VARCHAR(8)]:VARCHAR(8))])
DruidFilter(condition=[SEARCH($2, Sarg['Aarhus':VARCHAR, 'New York':VARCHAR]:VARCHAR)])
DruidTableScan(table=[[druid, wikipedia]], druid=[logical])
!druidPlan

View File

@ -69,7 +69,6 @@ import org.apache.druid.sql.calcite.util.SqlTestFramework;
import org.apache.druid.sql.calcite.util.SqlTestFramework.Builder;
import org.apache.druid.sql.calcite.util.SqlTestFramework.PlannerComponentSupplier;
import org.apache.druid.sql.calcite.util.SqlTestFramework.QueryComponentSupplier;
import org.apache.druid.sql.guice.SqlModule;
import org.apache.http.client.utils.URIBuilder;
import org.eclipse.jetty.server.Server;
@ -259,7 +258,7 @@ public class DruidAvaticaTestDriver implements Driver
binder.bind(QueryScheduler.class)
.toProvider(QuerySchedulerProvider.class)
.in(LazySingleton.class);
binder.install(new SqlModule.SqlStatementFactoryModule());
// binder.install(new SqlModule.SqlStatementFactoryModule());
binder.bind(new TypeLiteral<Supplier<DefaultQueryConfig>>()
{
}).toInstance(Suppliers.ofInstance(new DefaultQueryConfig(ImmutableMap.of())));