fix types/resultset/etc

This commit is contained in:
Zoltan Haindrich 2024-07-17 19:30:33 +00:00
parent c59f1adcc8
commit 8b26e490e9
7 changed files with 167 additions and 74 deletions

View File

@ -26,6 +26,7 @@ import com.google.common.base.Preconditions;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.druid.common.config.Configs;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
import javax.annotation.Nullable;
import java.util.List;
@ -135,5 +136,18 @@ public class MSQResultsReport
{
return name + ":" + type;
}
public static RowSignature toRowSignature(List<ColumnAndType> columnAndTypes)
{
final RowSignature.Builder builder = RowSignature.builder();
for (MSQResultsReport.ColumnAndType columnAndType : columnAndTypes) {
builder.add(columnAndType.getName(), columnAndType.getType());
}
RowSignature rowSignature = builder.build();
if (rowSignature.size() != columnAndTypes.size()) {
throw new IllegalArgumentException("Duplicate column names are not allowed in RowSignature");
}
return rowSignature;
}
}
}

View File

@ -56,6 +56,7 @@ import org.apache.druid.server.security.Escalator;
import org.apache.druid.sql.SqlStatementFactory;
import org.apache.druid.sql.SqlToolbox;
import org.apache.druid.sql.avatica.DruidMeta;
import org.apache.druid.sql.avatica.MSQDruidMeta;
import org.apache.druid.sql.calcite.DrillWindowQueryTest;
import org.apache.druid.sql.calcite.QueryTestBuilder;
import org.apache.druid.sql.calcite.SqlTestFrameworkConfig;

View File

@ -1,49 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.msq.exec;
import com.google.inject.Inject;
import org.apache.druid.msq.guice.MultiStageQuery;
import org.apache.druid.server.security.AuthenticatorMapper;
import org.apache.druid.sql.SqlStatementFactory;
import org.apache.druid.sql.avatica.AbstractDruidJdbcStatement;
import org.apache.druid.sql.avatica.AvaticaServerConfig;
import org.apache.druid.sql.avatica.DruidMeta;
import org.apache.druid.sql.avatica.ErrorHandler;
public class MSQDruidMeta extends DruidMeta
{
@Inject
public MSQDruidMeta(
final @MultiStageQuery SqlStatementFactory sqlStatementFactory,
final AvaticaServerConfig config,
final ErrorHandler errorHandler,
final AuthenticatorMapper authMapper)
{
super(sqlStatementFactory, config, errorHandler, authMapper);
}
@Override
protected ExecuteResult doFetch(AbstractDruidJdbcStatement druidStatement, int maxRows)
{
return super.doFetch(druidStatement, maxRows);
}
}

View File

@ -19,21 +19,18 @@
package org.apache.druid.msq.exec;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.quidem.DruidQuidemTestBase;
import org.apache.druid.quidem.ProjectPathUtils;
import org.junit.jupiter.api.condition.EnabledIf;
import java.io.File;
@EnabledIf(value = "enabled", disabledReason = "These tests are only run in SqlCompatible mode!")
//@EnabledIf(value = "enabled", disabledReason = "These tests are only run in SqlCompatible mode!")
public class MSQQuidemTest extends DruidQuidemTestBase
{
public static boolean enabled()
{
NullHandling.initializeForTests();
return NullHandling.sqlCompatible();
}
// public static boolean enabled()
// {
// NullHandling.initializeForTests();
// return NullHandling.sqlCompatible();
// }
public MSQQuidemTest()
{

View File

@ -21,6 +21,7 @@ package org.apache.druid.msq.test;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.msq.indexing.report.MSQResultsReport;
import org.apache.druid.msq.indexing.report.MSQResultsReport.ColumnAndType;
import org.apache.druid.msq.indexing.report.MSQTaskReport;
import org.apache.druid.msq.indexing.report.MSQTaskReportPayload;
import org.apache.druid.segment.column.RowSignature;
@ -104,7 +105,7 @@ public class ExtractResultsFactory implements QueryTestRunner.QueryRunStepFactor
}
extractedResults.add(
results.withSignatureAndResults(
convertColumnAndTypeToRowSignature(payload.getResults().getSignature()), resultRows)
ColumnAndType.toRowSignature(payload.getResults().getSignature()), resultRows)
);
}
}
@ -114,15 +115,6 @@ public class ExtractResultsFactory implements QueryTestRunner.QueryRunStepFactor
{
return extractedResults;
}
private RowSignature convertColumnAndTypeToRowSignature(final List<MSQResultsReport.ColumnAndType> columnAndTypes)
{
final RowSignature.Builder builder = RowSignature.builder();
for (MSQResultsReport.ColumnAndType columnAndType : columnAndTypes) {
builder.add(columnAndType.getName(), columnAndType.getType());
}
return builder.build();
}
};
}
}

View File

@ -0,0 +1,137 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.sql.avatica;
//package org.apache.druid.msq.exec;
import com.google.common.collect.ImmutableList;
import com.google.inject.Inject;
import org.apache.calcite.avatica.Meta;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.msq.guice.MultiStageQuery;
import org.apache.druid.msq.indexing.report.MSQResultsReport.ColumnAndType;
import org.apache.druid.msq.indexing.report.MSQTaskReport;
import org.apache.druid.msq.indexing.report.MSQTaskReportPayload;
import org.apache.druid.msq.test.MSQTestBase;
import org.apache.druid.msq.test.MSQTestOverlordServiceClient;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.server.security.AuthenticatorMapper;
import org.apache.druid.sql.SqlStatementFactory;
import org.apache.druid.sql.calcite.planner.DruidTypeSystem;
import org.apache.druid.sql.calcite.table.RowSignatures;
import java.util.Collections;
import java.util.List;
public class MSQDruidMeta extends DruidMeta
{
protected final MSQTestOverlordServiceClient overlordClient;
@Inject
public MSQDruidMeta(
final @MultiStageQuery SqlStatementFactory sqlStatementFactory,
final AvaticaServerConfig config,
final ErrorHandler errorHandler,
final AuthenticatorMapper authMapper,
final MSQTestOverlordServiceClient overlordClient)
{
super(sqlStatementFactory, config, errorHandler, authMapper);
this.overlordClient = overlordClient;
}
@Override
protected ExecuteResult doFetch(AbstractDruidJdbcStatement druidStatement, int maxRows)
{
String taskId = extractTaskId(druidStatement);
MSQTaskReportPayload payload = (MSQTaskReportPayload) overlordClient.getReportForTask(taskId)
.get(MSQTaskReport.REPORT_KEY)
.getPayload();
if (payload.getStatus().getStatus().isFailure()) {
throw new ISE(
"Query task [%s] failed due to %s",
taskId,
payload.getStatus().getErrorReport().toString()
);
}
if (!payload.getStatus().getStatus().isComplete()) {
throw new ISE("Query task [%s] should have finished", taskId);
}
final List<?> resultRows = MSQTestBase.getRows(payload.getResults());
if (resultRows == null) {
throw new ISE("Results report not present in the task's report payload");
}
// extractedResults.add(
// results.withSignatureAndResults(
// convertColumnAndTypeToRowSignature(payload.getResults().getSignature()), resultRows
// )
// );
Signature signature = makeSignature(druidStatement, payload.getResults().getSignature());
@SuppressWarnings("unchecked")
Frame firstFrame = Frame.create(0, true, (List<Object>) resultRows);
return new ExecuteResult(
ImmutableList.of(
MetaResultSet.create(
druidStatement.connectionId,
druidStatement.statementId,
false,
signature,
firstFrame
)
)
);
}
private Signature makeSignature(AbstractDruidJdbcStatement druidStatement, List<ColumnAndType> cat)
{
RowSignature sig = ColumnAndType.toRowSignature(cat);
RelDataType rowType = decodeRowRelDataType(sig);
return Meta.Signature.create(
AbstractDruidJdbcStatement.createColumnMetaData(rowType),
"some query?",
Collections.emptyList(),
Meta.CursorFactory.ARRAY,
Meta.StatementType.SELECT // We only support SELECT
);
}
private RelDataType decodeRowRelDataType(RowSignature sig)
{
return RowSignatures.toRelDataType(sig, DruidTypeSystem.TYPE_FACTORY);
// typeFactory.createStructType(
// signature.stream()
// .map(columnAndType -> relDataTypeFactory.createJavaType(columnAndType.getType()))
// .toArray(RelDataType[]::new));
}
private String extractTaskId(AbstractDruidJdbcStatement druidStatement)
{
ExecuteResult r = super.doFetch(druidStatement, 2);
Object[] row = (Object[]) r.resultSets.get(0).firstFrame.rows.iterator().next();
return (String) row[0];
}
}

View File

@ -7,12 +7,13 @@ from wikipedia
where cityName in ('New York', 'Aarhus')
group by 1
order by 1;
+--------------------------------------------+
| TASK |
+--------------------------------------------+
| query-09ed2fdf-e74e-4d61-9a5b-b9ce4949aa48 |
+--------------------------------------------+
(1 row)
+----------+-----+------+
| cityName | cnt | aall |
+----------+-----+------+
| Aarhus | 0 | 1 |
| New York | 7 | 13 |
+----------+-----+------+
(2 rows)
!ok
LogicalSort(sort0=[$0], dir0=[ASC])