Merge remote-tracking branch 'apache/master' into quidem-runner-extension-submit

This commit is contained in:
Zoltan Haindrich 2024-05-21 06:53:46 +00:00
commit ecdcd0f621
23 changed files with 1102 additions and 87 deletions

View File

@ -47,7 +47,7 @@ jobs:
strategy:
fail-fast: false
matrix:
testing_group: [batch-index, input-format, input-source, perfect-rollup-parallel-batch-index, kafka-index, kafka-index-slow, kafka-transactional-index, kafka-transactional-index-slow, kafka-data-format, ldap-security, realtime-index, append-ingestion, compaction, cds-task-schema-publish-disabled, cds-coordinator-smq-disabled]
testing_group: [batch-index, input-format, input-source, perfect-rollup-parallel-batch-index, kafka-index, kafka-index-slow, kafka-transactional-index, kafka-transactional-index-slow, kafka-data-format, ldap-security, realtime-index, append-ingestion, compaction, cds-task-schema-publish-disabled, cds-coordinator-metadata-query-disabled]
uses: ./.github/workflows/reusable-standard-its.yml
if: ${{ needs.changes.outputs.core == 'true' || needs.changes.outputs.common-extensions == 'true' }}
with:
@ -196,6 +196,6 @@ jobs:
with:
build_jdk: 8
runtime_jdk: 8
testing_groups: -DexcludedGroups=batch-index,input-format,input-source,perfect-rollup-parallel-batch-index,kafka-index,query,query-retry,query-error,realtime-index,security,ldap-security,s3-deep-storage,gcs-deep-storage,azure-deep-storage,hdfs-deep-storage,s3-ingestion,kinesis-index,kinesis-data-format,kafka-transactional-index,kafka-index-slow,kafka-transactional-index-slow,kafka-data-format,hadoop-s3-to-s3-deep-storage,hadoop-s3-to-hdfs-deep-storage,hadoop-azure-to-azure-deep-storage,hadoop-azure-to-hdfs-deep-storage,hadoop-gcs-to-gcs-deep-storage,hadoop-gcs-to-hdfs-deep-storage,aliyun-oss-deep-storage,append-ingestion,compaction,high-availability,upgrade,shuffle-deep-store,custom-coordinator-duties,centralized-datasource-schema,cds-task-schema-publish-disabled,cds-coordinator-smq-disabled
testing_groups: -DexcludedGroups=batch-index,input-format,input-source,perfect-rollup-parallel-batch-index,kafka-index,query,query-retry,query-error,realtime-index,security,ldap-security,s3-deep-storage,gcs-deep-storage,azure-deep-storage,hdfs-deep-storage,s3-ingestion,kinesis-index,kinesis-data-format,kafka-transactional-index,kafka-index-slow,kafka-transactional-index-slow,kafka-data-format,hadoop-s3-to-s3-deep-storage,hadoop-s3-to-hdfs-deep-storage,hadoop-azure-to-azure-deep-storage,hadoop-azure-to-hdfs-deep-storage,hadoop-gcs-to-gcs-deep-storage,hadoop-gcs-to-hdfs-deep-storage,aliyun-oss-deep-storage,append-ingestion,compaction,high-availability,upgrade,shuffle-deep-store,custom-coordinator-duties,centralized-datasource-schema,cds-task-schema-publish-disabled,cds-coordinator-metadata-query-disabled
use_indexer: ${{ matrix.indexer }}
group: other

View File

@ -3594,17 +3594,7 @@ Content-Type: application/json
<details>
<summary>View the response</summary>
```json
{
"id": "social_media",
"taskGroupIds": [
1,
2,
3
]
}
```
(empty response)
</details>
### Shut down a supervisor

View File

@ -34,7 +34,7 @@ Compressed big decimal is an absolute number based complex type based on big dec
2. Accuracy: Provides greater level of accuracy in decimal arithmetic
## Operations
To use this extension, make sure to [load](../../configuration/extensions.md#loading-extensions) `compressed-big-decimal` to your config file.
To use this extension, make sure to [load](../../configuration/extensions.md#loading-extensions) `druid-compressed-bigdecimal` to your config file.
## Configuration
There are currently no configuration properties specific to Compressed Big Decimal

View File

@ -277,6 +277,6 @@ The [DataSketches extension](../development/extensions-core/datasketches-extensi
|`CASE WHEN boolean_expr1 THEN result1 \[ WHEN boolean_expr2 THEN result2 ... \] \[ ELSE resultN \] END`|Searched CASE.|
|`CAST(value AS TYPE)`|Cast value to another type. See [Data types](sql-data-types.md) for details about how Druid SQL handles CAST.|
|`COALESCE(value1, value2, ...)`|Returns the first value that is neither NULL nor empty string.|
|`DECODE_BASE64_COMPLEX(dataType, expr)`| Decodes a Base64-encoded string into a complex data type, where `dataType` is the complex data type and `expr` is the Base64-encoded string to decode. The `hyperUnique` and `serializablePairLongString` data types are supported by default. You can enable support for the following complex data types by loading their extensions:<br/><ul><li>`druid-bloom-filter`: `bloom`</li><li>`druid-datasketches`: `arrayOfDoublesSketch`, `HLLSketch`, `KllDoublesSketch`, `KllFloatsSketch`, `quantilesDoublesSketch`, `thetaSketch`</li><li>`druid-histogram`: `approximateHistogram`, `fixedBucketsHistogram`</li><li>`druid-stats`: `variance`</li><li>`druid-compressed-big-decimal`: `compressedBigDecimal`</li><li>`druid-momentsketch`: `momentSketch`</li><li>`druid-tdigestsketch`: `tDigestSketch`</li></ul>|
|`DECODE_BASE64_COMPLEX(dataType, expr)`| Decodes a Base64-encoded string into a complex data type, where `dataType` is the complex data type and `expr` is the Base64-encoded string to decode. The `hyperUnique` and `serializablePairLongString` data types are supported by default. You can enable support for the following complex data types by loading their extensions:<br/><ul><li>`druid-bloom-filter`: `bloom`</li><li>`druid-datasketches`: `arrayOfDoublesSketch`, `HLLSketch`, `KllDoublesSketch`, `KllFloatsSketch`, `quantilesDoublesSketch`, `thetaSketch`</li><li>`druid-histogram`: `approximateHistogram`, `fixedBucketsHistogram`</li><li>`druid-stats`: `variance`</li><li>`druid-compressed-bigdecimal`: `compressedBigDecimal`</li><li>`druid-momentsketch`: `momentSketch`</li><li>`druid-tdigestsketch`: `tDigestSketch`</li></ul>|
|`NULLIF(value1, value2)`|Returns NULL if `value1` and `value2` match, else returns `value1`.|
|`NVL(value1, value2)`|Returns `value1` if `value1` is not null, otherwise `value2`.|

View File

@ -422,7 +422,7 @@ public class SupervisorResource
manager -> {
try {
if (manager.handoffTaskGroupsEarly(id, taskGroupIds)) {
return Response.ok(ImmutableMap.of("id", id, "taskGroupIds", taskGroupIds)).build();
return Response.ok().build();
} else {
return Response.status(Response.Status.NOT_FOUND)
.entity(ImmutableMap.of("error", StringUtils.format("Supervisor was not found [%s]", id)))

View File

@ -203,7 +203,7 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
final String baseSequenceName;
DateTime completionTimeout; // is set after signalTasksToFinish(); if not done by timeout, take corrective action
boolean shutdownEarly = false; // set by SupervisorManager.stopTaskGroupEarly
boolean handoffEarly = false; // set by SupervisorManager.stopTaskGroupEarly
TaskGroup(
int groupId,
@ -268,14 +268,14 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
return tasks.keySet();
}
void setShutdownEarly()
void setHandoffEarly()
{
shutdownEarly = true;
handoffEarly = true;
}
Boolean getShutdownEarly()
Boolean getHandoffEarly()
{
return shutdownEarly;
return handoffEarly;
}
@VisibleForTesting
@ -690,8 +690,8 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
log.info("Tried to stop task group [%d] for supervisor [%s] that wasn't actively reading.", taskGroupId, supervisorId);
continue;
}
taskGroup.setShutdownEarly();
log.info("Task group [%d] for supervisor [%s] will handoff early.", taskGroupId, supervisorId);
taskGroup.setHandoffEarly();
}
}
@ -3194,7 +3194,7 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
} else {
DateTime earliestTaskStart = computeEarliestTaskStartTime(group);
if (earliestTaskStart.plus(ioConfig.getTaskDuration()).isBeforeNow() || group.getShutdownEarly()) {
if (earliestTaskStart.plus(ioConfig.getTaskDuration()).isBeforeNow() || group.getHandoffEarly()) {
// if this task has run longer than the configured duration
// as long as the pending task groups are less than the configured stop task count.
// If shutdownEarly has been set, ignore stopTaskCount since this is a manual operator action.
@ -3202,7 +3202,7 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
.stream()
.mapToInt(CopyOnWriteArrayList::size)
.sum() + stoppedTasks.get()
< ioConfig.getMaxAllowedStops() || group.getShutdownEarly()) {
< ioConfig.getMaxAllowedStops() || group.getHandoffEarly()) {
log.info(
"Task group [%d] has run for [%s]. Stopping.",
groupId,

View File

@ -0,0 +1,448 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.testsEx.catalog;
import com.google.common.collect.ImmutableList;
import com.google.inject.Inject;
import org.apache.druid.catalog.model.Columns;
import org.apache.druid.catalog.model.TableMetadata;
import org.apache.druid.catalog.model.table.ClusterKeySpec;
import org.apache.druid.catalog.model.table.DatasourceDefn;
import org.apache.druid.catalog.model.table.TableBuilder;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.msq.sql.SqlTaskStatus;
import org.apache.druid.testing.utils.DataLoaderHelper;
import org.apache.druid.testing.utils.MsqTestQueryHelper;
import org.apache.druid.testsEx.cluster.CatalogClient;
import org.apache.druid.testsEx.cluster.DruidClusterClient;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
/**
* Tests that expect succesfully ingestng data into catalog defined tables and querying the data
* gives expected results.
*/
public abstract class ITCatalogIngestAndQueryTest
{
public static final Logger LOG = new Logger(ITCatalogIngestAndQueryTest.class);
@Inject
private MsqTestQueryHelper msqHelper;
@Inject
private DataLoaderHelper dataLoaderHelper;
@Inject
private DruidClusterClient clusterClient;
private CatalogClient client;
private final String operationName;
private final String dmlPrefixPattern;
public ITCatalogIngestAndQueryTest()
{
this.operationName = getOperationName();
this.dmlPrefixPattern = getDmlPrefixPattern();
}
public abstract String getOperationName();
public abstract String getDmlPrefixPattern();
@Before
public void initializeClient()
{
client = new CatalogClient(clusterClient);
}
/**
* Create table with columns:
* <p>
* __time LONG
* double_col1 DOUBLE
* <p>
* And insert the following data:
* <p>
* __time, varchar_col1, bigint_col1, float_col1, varchar_col2
* 2022-12-26T12:34:56,extra,10,"20",2.0,foo
* 2022-12-26T12:34:56,extra,9,"30",2.0,foo
* 2022-12-26T12:34:56,extra,8,"40",2.0,foq
* 2022-12-26T12:34:56,extra,8,"50",2.0,fop
* <p>
* When querying the table with query: 'SELECT * from ##tableName', the BIGINT type column should
* be implicitly coherced into type DOUBLE when inserted into the table, since the column being
* written into is type DOUBLE.
* <p>
* __time, bigint_col1
* 2022-12-26T12:34:56,8.0
* 2022-12-26T12:34:56,8.0
* 2022-12-26T12:34:56,9.0
* 2022-12-26T12:34:56,10.0
*
*/
@Test
public void testInsertImplicitCast() throws Exception
{
String queryFile = "/catalog/implicitCast_select.sql";
String tableName = "testImplicitCast" + operationName;
TableMetadata table = TableBuilder.datasource(tableName, "P1D")
.column(Columns.TIME_COLUMN, Columns.LONG)
.column("double_col1", "DOUBLE")
.build();
client.createTable(table, true);
LOG.info("table created:\n%s", client.readTable(table.id()));
String queryInline =
StringUtils.format(dmlPrefixPattern, tableName) + "\n"
+ "SELECT\n"
+ " TIME_PARSE(a) AS __time,\n"
+ " c AS double_col1\n"
+ "FROM TABLE(\n"
+ " EXTERN(\n"
+ " '{\"type\":\"inline\",\"data\":\"2022-12-26T12:34:56,extra,10,\\\"20\\\",2.0,foo\\n2022-12-26T12:34:56,extra,9,\\\"30\\\",2.0,foo\\n2022-12-26T12:34:56,extra,8,\\\"40\\\",2.0,foq\\n2022-12-26T12:34:56,extra,8,\\\"50\\\",2.0,fop\"}',\n"
+ " '{\"type\":\"csv\",\"findColumnsFromHeader\":false,\"columns\":[\"a\",\"b\",\"c\",\"d\",\"e\",\"f\"]}'\n"
+ " )\n"
+ ") "
+ " EXTEND (a VARCHAR, b VARCHAR, c BIGINT, d VARCHAR, e FLOAT, f VARCHAR)\n";
// Submit the task and wait for the datasource to get loaded
LOG.info("Running query:\n%s", queryInline);
SqlTaskStatus sqlTaskStatus = msqHelper.submitMsqTaskSuccesfully(queryInline);
if (sqlTaskStatus.getState().isFailure()) {
Assert.fail(StringUtils.format(
"Unable to start the task successfully.\nPossible exception: %s",
sqlTaskStatus.getError()
));
}
msqHelper.pollTaskIdForSuccess(sqlTaskStatus.getTaskId());
dataLoaderHelper.waitUntilDatasourceIsReady(tableName);
msqHelper.testQueriesFromFile(queryFile, tableName);
}
/**
* Create table with columns:
* <p>
* __time LONG
* double_col1 DOUBLE
* <p>
* and clustering columns defined in catalog as
* <p>
* bigInt_col1
* <p>
* And insert the following data:
* <p>
* __time, varchar_col1, bigint_col1, float_col1, varchar_col2
* 2022-12-26T12:34:56,extra,10,"20",2.0,foo
* 2022-12-26T12:34:56,extra,9,"30",2.0,foo
* 2022-12-26T12:34:56,extra,8,"40",2.0,foq
* 2022-12-26T12:34:56,extra,8,"50",2.0,fop
* <p>
* When querying the table with query: 'SELECT * from ##tableName', because of the clustering
* defined on the table, the data should be reordered to:
* <p>
* __time, bigint_col1
* 2022-12-26T12:34:56,8
* 2022-12-26T12:34:56,8
* 2022-12-26T12:34:56,9
* 2022-12-26T12:34:56,10
*
*/
@Test
public void testInsertWithClusteringFromCatalog() throws Exception
{
String queryFile = "/catalog/clustering_select.sql";
String tableName = "testWithClusteringFromCatalog" + operationName;
TableMetadata table = TableBuilder.datasource(tableName, "P1D")
.column(Columns.TIME_COLUMN, Columns.LONG)
.column("bigint_col1", "BIGINT")
.property(
DatasourceDefn.CLUSTER_KEYS_PROPERTY,
ImmutableList.of(new ClusterKeySpec("bigint_col1", false))
)
.build();
client.createTable(table, true);
LOG.info("table created:\n%s", client.readTable(table.id()));
String queryInline =
StringUtils.format(dmlPrefixPattern, tableName) + "\n"
+ "SELECT\n"
+ " TIME_PARSE(a) AS __time,\n"
+ " c AS bigint_col1\n"
+ "FROM TABLE(\n"
+ " EXTERN(\n"
+ " '{\"type\":\"inline\",\"data\":\"2022-12-26T12:34:56,extra,10,\\\"20\\\",2.0,foo\\n2022-12-26T12:34:56,extra,9,\\\"30\\\",2.0,foo\\n2022-12-26T12:34:56,extra,8,\\\"40\\\",2.0,foq\\n2022-12-26T12:34:56,extra,8,\\\"50\\\",2.0,fop\"}',\n"
+ " '{\"type\":\"csv\",\"findColumnsFromHeader\":false,\"columns\":[\"a\",\"b\",\"c\",\"d\",\"e\",\"f\"]}'\n"
+ " )\n"
+ ") "
+ " EXTEND (a VARCHAR, b VARCHAR, c BIGINT, d VARCHAR, e FLOAT, f VARCHAR)\n";
// Submit the task and wait for the datasource to get loaded
LOG.info("Running query:\n%s", queryInline);
SqlTaskStatus sqlTaskStatus = msqHelper.submitMsqTaskSuccesfully(queryInline);
if (sqlTaskStatus.getState().isFailure()) {
Assert.fail(StringUtils.format(
"Unable to start the task successfully.\nPossible exception: %s",
sqlTaskStatus.getError()
));
}
msqHelper.pollTaskIdForSuccess(sqlTaskStatus.getTaskId());
dataLoaderHelper.waitUntilDatasourceIsReady(tableName);
msqHelper.testQueriesFromFile(queryFile, tableName);
}
/**
* Create table with columns:
* <p>
* __time LONG
* double_col1 DOUBLE
* <p>
* and clustering columns defined in query as
* <p>
* bigInt_col1
* <p>
* And insert the following data:
* <p>
* __time, varchar_col1, bigint_col1, float_col1, varchar_col2
* 2022-12-26T12:34:56,extra,10,"20",2.0,foo
* 2022-12-26T12:34:56,extra,9,"30",2.0,foo
* 2022-12-26T12:34:56,extra,8,"40",2.0,foq
* 2022-12-26T12:34:56,extra,8,"50",2.0,fop
* <p>
* When querying the table with query: 'SELECT * from ##tableName', because of the clustering
* defined on the table, the data should be reordered to:
* <p>
* __time, bigint_col1
* 2022-12-26T12:34:56,8
* 2022-12-26T12:34:56,8
* 2022-12-26T12:34:56,9
* 2022-12-26T12:34:56,10
*
*/
@Test
public void testInsertWithClusteringFromQuery() throws Exception
{
String queryFile = "/catalog/clustering_select.sql";
String tableName = "testWithClusteringFromQuery" + operationName;
TableMetadata table = TableBuilder.datasource(tableName, "P1D")
.column(Columns.TIME_COLUMN, Columns.LONG)
.column("bigint_col1", "BIGINT")
.build();
client.createTable(table, true);
LOG.info("table created:\n%s", client.readTable(table.id()));
String queryInline =
StringUtils.format(dmlPrefixPattern, tableName) + "\n"
+ "SELECT\n"
+ " TIME_PARSE(a) AS __time,\n"
+ " c AS bigint_col1\n"
+ "FROM TABLE(\n"
+ " EXTERN(\n"
+ " '{\"type\":\"inline\",\"data\":\"2022-12-26T12:34:56,extra,10,\\\"20\\\",2.0,foo\\n2022-12-26T12:34:56,extra,9,\\\"30\\\",2.0,foo\\n2022-12-26T12:34:56,extra,8,\\\"40\\\",2.0,foq\\n2022-12-26T12:34:56,extra,8,\\\"50\\\",2.0,fop\"}',\n"
+ " '{\"type\":\"csv\",\"findColumnsFromHeader\":false,\"columns\":[\"a\",\"b\",\"c\",\"d\",\"e\",\"f\"]}'\n"
+ " )\n"
+ ") "
+ " EXTEND (a VARCHAR, b VARCHAR, c BIGINT, d VARCHAR, e FLOAT, f VARCHAR)\n"
+ "CLUSTERED BY \"bigint_col1\"\n";
// Submit the task and wait for the datasource to get loaded
LOG.info("Running query:\n%s", queryInline);
SqlTaskStatus sqlTaskStatus = msqHelper.submitMsqTaskSuccesfully(queryInline);
if (sqlTaskStatus.getState().isFailure()) {
Assert.fail(StringUtils.format(
"Unable to start the task successfully.\nPossible exception: %s",
sqlTaskStatus.getError()
));
}
msqHelper.pollTaskIdForSuccess(sqlTaskStatus.getTaskId());
dataLoaderHelper.waitUntilDatasourceIsReady(tableName);
msqHelper.testQueriesFromFile(queryFile, tableName);
}
/**
* Create table with columns:
* <p>
* __time LONG
* varchar_col1 VARCHAR
* bigint_col1 BIGINT
* float_col1 FLOAT
* varchar_col2 VARCHAR
* <p>
* and multiple clustering columns defined in catalog as
* <p>
* bigInt_col1, varchar_col2
* <p>
* And insert the following data:
* <p>
* __time, varchar_col1, bigint_col1, float_col1, varchar_col2
* 2022-12-26T12:34:56,extra,10,"20",2.0,foo
* 2022-12-26T12:34:56,extra,9,"30",2.0,foo
* 2022-12-26T12:34:56,extra,8,"40",2.0,foq
* 2022-12-26T12:34:56,extra,8,"50",2.0,fop
* <p>
* When querying the table with query: 'SELECT * from ##tableName', because of the clustering
* defined on the table, the data should be reordered to:
* <p>
* __time, varchar_col1, bigint_col1, float_col1, varchar_col2
* 2022-12-26T12:34:56,extra,8,"50",2.0,fop
* 2022-12-26T12:34:56,extra,8,"40",2.0,foq
* 2022-12-26T12:34:56,extra,9,"30",2.0,foo
* 2022-12-26T12:34:56,extra,10,"20",2.0,foo
*
*/
@Test
public void testInsertWithMultiClusteringFromCatalog() throws Exception
{
String queryFile = "/catalog/multiClustering_select.sql";
String tableName = "testWithMultiClusteringFromCatalog" + operationName;
TableMetadata table = TableBuilder.datasource(tableName, "P1D")
.column(Columns.TIME_COLUMN, Columns.LONG)
.column("varchar_col1", "VARCHAR")
.column("bigint_col1", "BIGINT")
.column("float_col1", "FLOAT")
.column("varchar_col2", "VARCHAR")
.property(
DatasourceDefn.CLUSTER_KEYS_PROPERTY,
ImmutableList.of(new ClusterKeySpec("bigint_col1", false), new ClusterKeySpec("varchar_col2", false))
)
.build();
client.createTable(table, true);
LOG.info("table created:\n%s", client.readTable(table.id()));
String queryInline =
StringUtils.format(dmlPrefixPattern, tableName) + "\n"
+ "SELECT\n"
+ " TIME_PARSE(a) AS __time,\n"
+ " b AS varchar_col1,\n"
+ " c AS bigint_col1,\n"
+ " e AS float_col1,\n"
+ " f AS varchar_col2\n"
+ "FROM TABLE(\n"
+ " EXTERN(\n"
+ " '{\"type\":\"inline\",\"data\":\"2022-12-26T12:34:56,extra,10,\\\"20\\\",2.0,foo\\n2022-12-26T12:34:56,extra,9,\\\"30\\\",2.0,foo\\n2022-12-26T12:34:56,extra,8,\\\"40\\\",2.0,foq\\n2022-12-26T12:34:56,extra,8,\\\"50\\\",2.0,fop\"}',\n"
+ " '{\"type\":\"csv\",\"findColumnsFromHeader\":false,\"columns\":[\"a\",\"b\",\"c\",\"d\",\"e\",\"f\"]}'\n"
+ " )\n"
+ ") "
+ " EXTEND (a VARCHAR, b VARCHAR, c BIGINT, d VARCHAR, e FLOAT, f VARCHAR)\n";
// Submit the task and wait for the datasource to get loaded
LOG.info("Running query:\n%s", queryInline);
SqlTaskStatus sqlTaskStatus = msqHelper.submitMsqTaskSuccesfully(queryInline);
if (sqlTaskStatus.getState().isFailure()) {
Assert.fail(StringUtils.format(
"Unable to start the task successfully.\nPossible exception: %s",
sqlTaskStatus.getError()
));
}
msqHelper.pollTaskIdForSuccess(sqlTaskStatus.getTaskId());
dataLoaderHelper.waitUntilDatasourceIsReady(tableName);
msqHelper.testQueriesFromFile(queryFile, tableName);
}
/**
* Create table with columns:
* <p>
* __time LONG
* varchar_col1 VARCHAR
* bigint_col1 BIGINT
* float_col1 FLOAT
* varchar_col2 VARCHAR
* <p>
* and multiple clustering columns defined in query as
* <p>
* bigInt_col1, varchar_col2
* <p>
* And insert the following data:
* <p>
* __time, varchar_col1, bigint_col1, float_col1, varchar_col2
* 2022-12-26T12:34:56,extra,10,"20",2.0,foo
* 2022-12-26T12:34:56,extra,9,"30",2.0,foo
* 2022-12-26T12:34:56,extra,8,"40",2.0,foq
* 2022-12-26T12:34:56,extra,8,"50",2.0,fop
* <p>
* When querying the table with query: 'SELECT * from ##tableName', because of the clustering
* defined on the query, the data should be reordered to:
* <p>
* __time, varchar_col1, bigint_col1, float_col1, varchar_col2
* 2022-12-26T12:34:56,extra,8,"50",2.0,fop
* 2022-12-26T12:34:56,extra,8,"40",2.0,foq
* 2022-12-26T12:34:56,extra,9,"30",2.0,foo
* 2022-12-26T12:34:56,extra,10,"20",2.0,foo
*
*/
@Test
public void testInsertWithMultiClusteringFromQuery() throws Exception
{
String queryFile = "/catalog/multiClustering_select.sql";
String tableName = "testWithMultiClusteringFromQuery" + operationName;
TableMetadata table = TableBuilder.datasource(tableName, "P1D")
.column(Columns.TIME_COLUMN, Columns.LONG)
.column("varchar_col1", "VARCHAR")
.column("bigint_col1", "BIGINT")
.column("float_col1", "FLOAT")
.column("varchar_col2", "VARCHAR")
.build();
client.createTable(table, true);
LOG.info("table created:\n%s", client.readTable(table.id()));
String queryInline =
StringUtils.format(dmlPrefixPattern, tableName) + "\n"
+ "SELECT\n"
+ " TIME_PARSE(a) AS __time,\n"
+ " b AS varchar_col1,\n"
+ " c AS bigint_col1,\n"
+ " e AS float_col1,\n"
+ " f AS varchar_col2\n"
+ "FROM TABLE(\n"
+ " EXTERN(\n"
+ " '{\"type\":\"inline\",\"data\":\"2022-12-26T12:34:56,extra,10,\\\"20\\\",2.0,foo\\n2022-12-26T12:34:56,extra,9,\\\"30\\\",2.0,foo\\n2022-12-26T12:34:56,extra,8,\\\"40\\\",2.0,foq\\n2022-12-26T12:34:56,extra,8,\\\"50\\\",2.0,fop\"}',\n"
+ " '{\"type\":\"csv\",\"findColumnsFromHeader\":false,\"columns\":[\"a\",\"b\",\"c\",\"d\",\"e\",\"f\"]}'\n"
+ " )\n"
+ ") "
+ " EXTEND (a VARCHAR, b VARCHAR, c BIGINT, d VARCHAR, e FLOAT, f VARCHAR)\n"
+ "CLUSTERED BY \"bigint_col1\", \"varchar_col2\"\n";
// Submit the task and wait for the datasource to get loaded
LOG.info("Running query:\n%s", queryInline);
SqlTaskStatus sqlTaskStatus = msqHelper.submitMsqTaskSuccesfully(queryInline);
if (sqlTaskStatus.getState().isFailure()) {
Assert.fail(StringUtils.format(
"Unable to start the task successfully.\nPossible exception: %s",
sqlTaskStatus.getError()
));
}
msqHelper.pollTaskIdForSuccess(sqlTaskStatus.getTaskId());
dataLoaderHelper.waitUntilDatasourceIsReady(tableName);
msqHelper.testQueriesFromFile(queryFile, tableName);
}
}

View File

@ -0,0 +1,251 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.testsEx.catalog;
import com.google.common.collect.ImmutableMap;
import com.google.inject.Inject;
import org.apache.druid.catalog.model.Columns;
import org.apache.druid.catalog.model.TableId;
import org.apache.druid.catalog.model.TableMetadata;
import org.apache.druid.catalog.model.table.DatasourceDefn;
import org.apache.druid.catalog.model.table.TableBuilder;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.msq.sql.SqlTaskStatus;
import org.apache.druid.sql.http.SqlQuery;
import org.apache.druid.testing.utils.MsqTestQueryHelper;
import org.apache.druid.testsEx.categories.Catalog;
import org.apache.druid.testsEx.cluster.CatalogClient;
import org.apache.druid.testsEx.cluster.DruidClusterClient;
import org.apache.druid.testsEx.config.DruidTestRunner;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import java.util.concurrent.ExecutionException;
import static org.junit.Assert.assertTrue;
/**
* Tests that expect failures when ingestng data into catalog defined tables.
*/
@RunWith(DruidTestRunner.class)
@Category(Catalog.class)
public class ITCatalogIngestErrorTest
{
@Inject
private MsqTestQueryHelper msqHelper;
@Inject
private DruidClusterClient clusterClient;
private CatalogClient client;
@Before
public void initializeClient()
{
client = new CatalogClient(clusterClient);
}
/**
* If the segment grain is absent in the catalog and absent in the PARTITIONED BY clause in the query, then
* validation error.
*/
@Test
public void testInsertNoPartitonedByFromCatalogOrQuery() throws ExecutionException, InterruptedException
{
String tableName = "testInsertNoPartitonedByFromCatalogOrQuery";
TableMetadata table = new TableBuilder(TableId.datasource(tableName), DatasourceDefn.TABLE_TYPE)
.column(Columns.TIME_COLUMN, Columns.LONG)
.column("varchar_col", "VARCHAR")
.column("bigint_col", "BIGINT")
.column("float_col", "FLOAT")
.build();
client.createTable(table, true);
String queryInline =
StringUtils.format(
"INSERT INTO %s\n"
+ "SELECT\n"
+ " TIME_PARSE(a) AS __time,\n"
+ " b AS varchar_col,\n"
+ " c AS bigint_col,\n"
+ " e AS float_col\n"
+ "FROM TABLE(\n"
+ " EXTERN(\n"
+ " '{\"type\":\"inline\",\"data\":\"2022-12-26T12:34:56,extra,10,\\\"20\\\",2.0,foo\"}',\n"
+ " '{\"type\":\"csv\",\"findColumnsFromHeader\":false,\"columns\":[\"a\",\"b\",\"c\",\"d\",\"e\",\"f\"]}'\n"
+ " )\n"
+ ") "
+ " EXTEND (a VARCHAR, b VARCHAR, c BIGINT, d VARCHAR, e FLOAT, f VARCHAR)\n",
tableName
);
SqlTaskStatus sqlTaskStatus = msqHelper.submitMsqTaskWithExpectedStatusCode(sqlQueryFromString(queryInline), null, null, HttpResponseStatus.BAD_REQUEST);
assertTrue(sqlTaskStatus.getError() != null && sqlTaskStatus.getError()
.getUnderlyingException()
.getMessage()
.equals(
"Operation [INSERT] requires a PARTITIONED BY to be explicitly defined, but none was found.")
);
}
/**
* Adding a new column during ingestion that is not defined in a sealed table should fail with
* proper validation error.
*/
@Test
public void testInsertNonDefinedColumnIntoSealedCatalogTable() throws ExecutionException, InterruptedException
{
String tableName = "testInsertNonDefinedColumnIntoSealedCatalogTable";
TableMetadata table = TableBuilder.datasource(tableName, "P1D")
.column(Columns.TIME_COLUMN, Columns.LONG)
.column("varchar_col", "VARCHAR")
.column("bigint_col", "BIGINT")
.column("float_col", "FLOAT")
.property(DatasourceDefn.SEALED_PROPERTY, true)
.build();
client.createTable(table, true);
String queryInline =
StringUtils.format(
"INSERT INTO %s\n"
+ "SELECT\n"
+ " TIME_PARSE(a) AS __time,\n"
+ " b AS varchar_col,\n"
+ " c AS bigint_col,\n"
+ " e AS float_col,\n"
+ " c AS extra\n"
+ "FROM TABLE(\n"
+ " EXTERN(\n"
+ " '{\"type\":\"inline\",\"data\":\"2022-12-26T12:34:56,extra,10,\\\"20\\\",2.0,foo\"}',\n"
+ " '{\"type\":\"csv\",\"findColumnsFromHeader\":false,\"columns\":[\"a\",\"b\",\"c\",\"d\",\"e\",\"f\"]}'\n"
+ " )\n"
+ ") "
+ " EXTEND (a VARCHAR, b VARCHAR, c BIGINT, d VARCHAR, e FLOAT, f VARCHAR)\n"
+ "PARTITIONED BY DAY\n",
tableName
);
SqlTaskStatus sqlTaskStatus = msqHelper.submitMsqTaskWithExpectedStatusCode(sqlQueryFromString(queryInline), null, null, HttpResponseStatus.BAD_REQUEST);
assertTrue(sqlTaskStatus.getError() != null && sqlTaskStatus.getError()
.getUnderlyingException()
.getMessage()
.equals(
"Column [extra] is not defined in the target table [druid.testInsertNonDefinedColumnIntoSealedCatalogTable] strict schema")
);
}
/**
* Assigning a column during ingestion, to an input type that is not compatible with the defined type of the
* column, should result in a proper validation error.
*/
@Test
public void testInsertWithIncompatibleTypeAssignment() throws ExecutionException, InterruptedException
{
String tableName = "testInsertWithIncompatibleTypeAssignment";
TableMetadata table = TableBuilder.datasource(tableName, "P1D")
.column(Columns.TIME_COLUMN, Columns.LONG)
.column("varchar_col", "VARCHAR")
.column("bigint_col", "BIGINT")
.column("float_col", "FLOAT")
.property(DatasourceDefn.SEALED_PROPERTY, true)
.build();
client.createTable(table, true);
String queryInline =
StringUtils.format(
"INSERT INTO %s\n"
+ "SELECT\n"
+ " TIME_PARSE(a) AS __time,\n"
+ " ARRAY[b] AS varchar_col,\n"
+ " c AS bigint_col,\n"
+ " e AS float_col\n"
+ "FROM TABLE(\n"
+ " EXTERN(\n"
+ " '{\"type\":\"inline\",\"data\":\"2022-12-26T12:34:56,extra,10,\\\"20\\\",2.0,foo\"}',\n"
+ " '{\"type\":\"csv\",\"findColumnsFromHeader\":false,\"columns\":[\"a\",\"b\",\"c\",\"d\",\"e\",\"f\"]}'\n"
+ " )\n"
+ ") "
+ " EXTEND (a VARCHAR, b VARCHAR, c BIGINT, d VARCHAR, e FLOAT, f VARCHAR)\n"
+ "PARTITIONED BY DAY\n",
tableName
);
SqlTaskStatus sqlTaskStatus = msqHelper.submitMsqTaskWithExpectedStatusCode(sqlQueryFromString(queryInline), null, null, HttpResponseStatus.BAD_REQUEST);
assertTrue(sqlTaskStatus.getError() != null && sqlTaskStatus.getError()
.getUnderlyingException()
.getMessage()
.equals(
"Cannot assign to target field 'varchar_col' of type VARCHAR from source field 'varchar_col' of type VARCHAR ARRAY (line [4], column [3])")
);
}
/**
* Assigning a complex type column during ingestion, to an input type that is not compatible with the defined type of
* the column, should result in a proper validation error.
*/
@Test
public void testInsertGroupByWithIncompatibleTypeAssignment() throws ExecutionException, InterruptedException
{
String tableName = "testInsertGroupByWithIncompatibleTypeAssignment";
TableMetadata table = TableBuilder.datasource(tableName, "P1D")
.column(Columns.TIME_COLUMN, Columns.LONG)
.column("varchar_col", "VARCHAR")
.column("bigint_col", "BIGINT")
.column("float_col", "FLOAT")
.column("hll_col", "COMPLEX<hyperUnique>")
.property(DatasourceDefn.SEALED_PROPERTY, true)
.build();
client.createTable(table, true);
String queryInline =
StringUtils.format(
"INSERT INTO %s\n"
+ "SELECT\n"
+ " TIME_PARSE(a) AS __time,\n"
+ " b AS varchar_col,\n"
+ " c AS bigint_col,\n"
+ " e AS float_col,\n"
+ " ARRAY[b] AS hll_col\n"
+ "FROM TABLE(\n"
+ " EXTERN(\n"
+ " '{\"type\":\"inline\",\"data\":\"2022-12-26T12:34:56,extra,10,\\\"20\\\",2.0,foo\"}',\n"
+ " '{\"type\":\"csv\",\"findColumnsFromHeader\":false,\"columns\":[\"a\",\"b\",\"c\",\"d\",\"e\",\"f\"]}'\n"
+ " )\n"
+ ") "
+ " EXTEND (a VARCHAR, b VARCHAR, c BIGINT, d VARCHAR, e FLOAT, f VARCHAR)\n"
+ "PARTITIONED BY DAY\n",
tableName
);
SqlTaskStatus sqlTaskStatus = msqHelper.submitMsqTaskWithExpectedStatusCode(sqlQueryFromString(queryInline), null, null, HttpResponseStatus.BAD_REQUEST);
assertTrue(sqlTaskStatus.getError() != null && sqlTaskStatus.getError()
.getUnderlyingException()
.getMessage()
.equals(
"Cannot assign to target field 'hll_col' of type COMPLEX<hyperUnique> from source field 'hll_col' of type VARCHAR ARRAY (line [7], column [3])")
);
}
private static SqlQuery sqlQueryFromString(String queryString)
{
return new SqlQuery(queryString, null, false, false, false, ImmutableMap.of(), null);
}
}

View File

@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.testsEx.catalog;
import org.apache.druid.testsEx.categories.Catalog;
import org.apache.druid.testsEx.config.DruidTestRunner;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
@RunWith(DruidTestRunner.class)
@Category(Catalog.class)
public class ITCatalogInsertAndQueryTest extends ITCatalogIngestAndQueryTest
{
@Override
public String getOperationName()
{
return "INSERT";
}
@Override
public String getDmlPrefixPattern()
{
return "INSERT INTO \"%s\"";
}
}

View File

@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.testsEx.catalog;
import org.apache.druid.testsEx.categories.Catalog;
import org.apache.druid.testsEx.config.DruidTestRunner;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
@RunWith(DruidTestRunner.class)
@Category(Catalog.class)
public class ITCatalogReplaceAndQueryTest extends ITCatalogIngestAndQueryTest
{
@Override
public String getOperationName()
{
return "REPLACE";
}
@Override
public String getDmlPrefixPattern()
{
return "REPLACE INTO \"%s\" OVERWRITE ALL";
}
}

View File

@ -19,6 +19,7 @@
package org.apache.druid.testsEx.catalog;
import com.google.common.collect.ImmutableList;
import com.google.inject.Inject;
import org.apache.druid.catalog.http.TableEditRequest.DropColumns;
import org.apache.druid.catalog.http.TableEditRequest.HideColumns;
@ -28,6 +29,7 @@ import org.apache.druid.catalog.model.CatalogUtils;
import org.apache.druid.catalog.model.TableId;
import org.apache.druid.catalog.model.TableMetadata;
import org.apache.druid.catalog.model.TableSpec;
import org.apache.druid.catalog.model.table.ClusterKeySpec;
import org.apache.druid.catalog.model.table.DatasourceDefn;
import org.apache.druid.catalog.model.table.TableBuilder;
import org.apache.druid.java.util.common.ISE;
@ -106,6 +108,17 @@ public class ITCatalogRestTest
() -> client.createTable(table, false)
);
}
// DESC cluster keys not supported
{
final TableMetadata table = TableBuilder.datasource("foo", "P1D")
.property(DatasourceDefn.CLUSTER_KEYS_PROPERTY, ImmutableList.of(new ClusterKeySpec("clusterKeyA", true)))
.build();
assertThrows(
Exception.class,
() -> client.createTable(table, false)
);
}
}
/**

View File

@ -0,0 +1,23 @@
[
{
"query": "SELECT * FROM %%DATASOURCE%%",
"expectedResults": [
{
"__time": 1672058096000,
"bigint_col1": 8
},
{
"__time": 1672058096000,
"bigint_col1": 8
},
{
"__time": 1672058096000,
"bigint_col1": 9
},
{
"__time": 1672058096000,
"bigint_col1": 10
}
]
}
]

View File

@ -0,0 +1,23 @@
[
{
"query": "SELECT * FROM %%DATASOURCE%%",
"expectedResults": [
{
"__time": 1672058096000,
"double_col1": 8.0
},
{
"__time": 1672058096000,
"double_col1": 8.0
},
{
"__time": 1672058096000,
"double_col1": 9.0
},
{
"__time": 1672058096000,
"double_col1": 10.0
}
]
}
]

View File

@ -0,0 +1,35 @@
[
{
"query": "SELECT * FROM %%DATASOURCE%%",
"expectedResults": [
{
"__time": 1672058096000,
"bigint_col1": 8,
"varchar_col2": "fop",
"varchar_col1": "extra",
"float_col1": 2.0
},
{
"__time": 1672058096000,
"bigint_col1": 8,
"varchar_col2": "foq",
"varchar_col1": "extra",
"float_col1": 2.0
},
{
"__time": 1672058096000,
"bigint_col1": 9,
"varchar_col2": "foo",
"varchar_col1": "extra",
"float_col1": 2.0
},
{
"__time": 1672058096000,
"bigint_col1": 10,
"varchar_col2": "foo",
"varchar_col1": "extra",
"float_col1": 2.0
}
]
}
]

View File

@ -63,6 +63,7 @@ services:
service: druid-historical
environment:
- DRUID_INTEGRATION_TEST_GROUP=${DRUID_INTEGRATION_TEST_GROUP}
- AWS_REGION=us-west-2
depends_on:
- druid-zookeeper-kafka

View File

@ -121,14 +121,29 @@ public class MsqTestQueryHelper extends AbstractTestQueryHelper<MsqQueryWithResu
* and returns the status associated with the submitted task
*/
public SqlTaskStatus submitMsqTaskSuccesfully(SqlQuery sqlQuery, String username, String password) throws ExecutionException, InterruptedException
{
return submitMsqTaskWithExpectedStatusCode(sqlQuery, username, password, HttpResponseStatus.ACCEPTED);
}
/**
* Submits a {@link SqlQuery} to the MSQ API for execution. This method waits for the task to be accepted by the cluster
* and returns the status associated with the submitted task
*/
public SqlTaskStatus submitMsqTaskWithExpectedStatusCode(
SqlQuery sqlQuery,
String username,
String password,
HttpResponseStatus expectedResponseStatus
) throws ExecutionException, InterruptedException
{
StatusResponseHolder statusResponseHolder = submitMsqTask(sqlQuery, username, password);
// Check if the task has been accepted successfully
HttpResponseStatus httpResponseStatus = statusResponseHolder.getStatus();
if (!httpResponseStatus.equals(HttpResponseStatus.ACCEPTED)) {
if (!httpResponseStatus.equals(expectedResponseStatus)) {
throw new ISE(
StringUtils.format(
"Unable to submit the task successfully. Received response status code [%d], and response content:\n[%s]",
"Expected response status code [%d] when submitting task. Received response status code [%d], and response content:\n[%s]",
expectedResponseStatus.getCode(),
httpResponseStatus.getCode(),
statusResponseHolder.getContent()
)

View File

@ -21,7 +21,6 @@ package org.apache.druid.indexing.overlord.supervisor;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.commons.lang3.NotImplementedException;
import org.apache.druid.error.DruidException;
import org.apache.druid.indexing.overlord.DataSourceMetadata;
import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats;
@ -95,9 +94,13 @@ public interface Supervisor
int getActiveTaskGroupsCount();
/** Handoff the task group with id=taskGroupId the next time the supervisor runs regardless of task run time*/
/**
* Marks the given task groups as ready for segment hand-off irrespective of the task run times.
* In the subsequent run, the supervisor initiates segment publish and hand-off for these task groups and rolls over their tasks.
* taskGroupIds that are not valid or not actively reading are simply ignored.
*/
default void handoffTaskGroupsEarly(List<Integer> taskGroupIds)
{
throw new NotImplementedException("Supervisor does not have the feature to handoff task groups early implemented");
throw new UnsupportedOperationException("Supervisor does not have the feature to handoff task groups early implemented");
}
}

View File

@ -129,6 +129,7 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
CentralizedDatasourceSchemaConfig centralizedDatasourceSchemaConfig
)
{
this.jsonMapper = jsonMapper;
this.dbTables = dbTables;
this.connector = connector;

View File

@ -23,14 +23,13 @@ import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import org.apache.druid.client.indexing.TaskStatusResponse;
import org.apache.druid.common.guava.FutureUtils;
import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexer.TaskStatusPlus;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.rpc.ServiceLocation;
@ -57,7 +56,6 @@ public class SpecificTaskServiceLocator implements ServiceLocator
private final String taskId;
private final OverlordClient overlordClient;
private final TaskLocationFetcher locationFetcher = new TaskLocationFetcher();
private final Object lock = new Object();
@GuardedBy("lock")
@ -125,42 +123,15 @@ public class SpecificTaskServiceLocator implements ServiceLocator
lastUpdateTime = System.currentTimeMillis();
final TaskStatus status = taskStatusMap.get(taskId);
if (status == null) {
// If the task status is unknown, we'll treat it as closed.
lastKnownState = null;
lastKnownLocation = null;
resolvePendingFuture(null, null);
} else if (TaskLocation.unknown().equals(status.getLocation())) {
// Do not resolve the future just yet, try the fallback API instead
fetchFallbackTaskLocation();
} else {
lastKnownState = status.getStatusCode();
final TaskLocation location;
if (TaskLocation.unknown().equals(status.getLocation())) {
location = locationFetcher.getLocation();
} else {
location = status.getLocation();
}
if (TaskLocation.unknown().equals(location)) {
lastKnownLocation = null;
} else {
lastKnownLocation = new ServiceLocation(
location.getHost(),
location.getPort(),
location.getTlsPort(),
StringUtils.format("%s/%s", BASE_PATH, StringUtils.urlEncode(taskId))
);
}
resolvePendingFuture(status.getStatusCode(), status.getLocation());
}
if (lastKnownState != TaskState.RUNNING) {
pendingFuture.set(ServiceLocations.closed());
} else if (lastKnownLocation == null) {
pendingFuture.set(ServiceLocations.forLocations(Collections.emptySet()));
} else {
pendingFuture.set(ServiceLocations.forLocation(lastKnownLocation));
}
// Clear pendingFuture once it has been set.
pendingFuture = null;
}
}
}
@ -168,17 +139,10 @@ public class SpecificTaskServiceLocator implements ServiceLocator
@Override
public void onFailure(Throwable t)
{
synchronized (lock) {
if (pendingFuture != null) {
pendingFuture.setException(t);
// Clear pendingFuture once it has been set.
pendingFuture = null;
}
}
resolvePendingFutureOnException(t);
}
},
MoreExecutors.directExecutor()
Execs.directExecutor()
);
return Futures.nonCancellationPropagating(retVal);
@ -209,18 +173,104 @@ public class SpecificTaskServiceLocator implements ServiceLocator
}
}
private class TaskLocationFetcher
private void resolvePendingFuture(TaskState state, TaskLocation location)
{
TaskLocation getLocation()
{
final TaskStatusResponse statusResponse = FutureUtils.getUnchecked(
overlordClient.taskStatus(taskId),
true
);
if (statusResponse == null || statusResponse.getStatus() == null) {
return TaskLocation.unknown();
} else {
return statusResponse.getStatus().getLocation();
synchronized (lock) {
if (pendingFuture != null) {
lastKnownState = state;
lastKnownLocation = location == null ? null : new ServiceLocation(
location.getHost(),
location.getPort(),
location.getTlsPort(),
StringUtils.format("%s/%s", BASE_PATH, StringUtils.urlEncode(taskId))
);
if (lastKnownState != TaskState.RUNNING) {
pendingFuture.set(ServiceLocations.closed());
} else if (lastKnownLocation == null) {
pendingFuture.set(ServiceLocations.forLocations(Collections.emptySet()));
} else {
pendingFuture.set(ServiceLocations.forLocation(lastKnownLocation));
}
// Clear pendingFuture once it has been set.
pendingFuture = null;
}
}
}
private void resolvePendingFutureOnException(Throwable t)
{
synchronized (lock) {
if (pendingFuture != null) {
pendingFuture.setException(t);
// Clear pendingFuture once it has been set.
pendingFuture = null;
}
}
}
/**
* Invokes the single task status API {@link OverlordClient#taskStatus} if the
* multi-task status API returns an unknown location (this can happen if the
* Overlord is running on a version older than Druid 30.0.0 (pre #15724)).
*/
private void fetchFallbackTaskLocation()
{
synchronized (lock) {
if (pendingFuture != null) {
final ListenableFuture<TaskStatusResponse> taskStatusFuture;
try {
taskStatusFuture = overlordClient.taskStatus(taskId);
}
catch (Exception e) {
resolvePendingFutureOnException(e);
return;
}
pendingFuture.addListener(
() -> {
if (!taskStatusFuture.isDone()) {
// pendingFuture may resolve without taskStatusFuture due to close().
taskStatusFuture.cancel(true);
}
},
Execs.directExecutor()
);
Futures.addCallback(
taskStatusFuture,
new FutureCallback<TaskStatusResponse>()
{
@Override
public void onSuccess(final TaskStatusResponse taskStatusResponse)
{
synchronized (lock) {
if (pendingFuture != null) {
lastUpdateTime = System.currentTimeMillis();
final TaskStatusPlus status = taskStatusResponse.getStatus();
if (status == null) {
// If the task status is unknown, we'll treat it as closed.
resolvePendingFuture(null, null);
} else if (TaskLocation.unknown().equals(status.getLocation())) {
resolvePendingFuture(status.getStatusCode(), null);
} else {
resolvePendingFuture(status.getStatusCode(), status.getLocation());
}
}
}
}
@Override
public void onFailure(Throwable t)
{
resolvePendingFutureOnException(t);
}
},
Execs.directExecutor()
);
}
}
}

View File

@ -20,7 +20,6 @@
package org.apache.druid.indexing;
import com.google.common.collect.ImmutableList;
import org.apache.commons.lang3.NotImplementedException;
import org.apache.druid.indexing.overlord.ObjectMetadata;
import org.apache.druid.indexing.overlord.supervisor.NoopSupervisorSpec;
import org.apache.druid.indexing.overlord.supervisor.Supervisor;
@ -97,7 +96,7 @@ public class NoopSupervisorSpecTest
{
NoopSupervisorSpec expectedSpec = new NoopSupervisorSpec(null, null);
Supervisor noOpSupervisor = expectedSpec.createSupervisor();
Assert.assertThrows(NotImplementedException.class,
Assert.assertThrows(UnsupportedOperationException.class,
() -> noOpSupervisor.handoffTaskGroupsEarly(ImmutableList.of())
);
}

View File

@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.sql.calcite.expression.builtin;
import org.apache.calcite.sql.SqlFunctionCategory;
import org.apache.calcite.sql.SqlOperator;
import org.apache.calcite.sql.type.InferTypes;
import org.apache.calcite.sql.type.OperandTypes;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.druid.sql.calcite.expression.DirectOperatorConversion;
import org.apache.druid.sql.calcite.expression.OperatorConversions;
public class DivOperatorConversion extends DirectOperatorConversion
{
private static final SqlOperator SQL_OPERATOR =
OperatorConversions
.operatorBuilder("DIV")
.operandTypeChecker(OperandTypes.DIVISION_OPERATOR)
.operandTypeInference(InferTypes.FIRST_KNOWN)
.returnTypeCascadeNullable(SqlTypeName.BIGINT)
.functionCategory(SqlFunctionCategory.NUMERIC)
.build();
public DivOperatorConversion()
{
super(SQL_OPERATOR, "div");
}
}

View File

@ -81,6 +81,7 @@ import org.apache.druid.sql.calcite.expression.builtin.ConcatOperatorConversion;
import org.apache.druid.sql.calcite.expression.builtin.ContainsOperatorConversion;
import org.apache.druid.sql.calcite.expression.builtin.DateTruncOperatorConversion;
import org.apache.druid.sql.calcite.expression.builtin.DecodeBase64UTFOperatorConversion;
import org.apache.druid.sql.calcite.expression.builtin.DivOperatorConversion;
import org.apache.druid.sql.calcite.expression.builtin.ExtractOperatorConversion;
import org.apache.druid.sql.calcite.expression.builtin.FloorOperatorConversion;
import org.apache.druid.sql.calcite.expression.builtin.GreatestOperatorConversion;
@ -369,6 +370,7 @@ public class DruidOperatorTable implements SqlOperatorTable
.add(new AliasedOperatorConversion(CHARACTER_LENGTH_CONVERSION, "STRLEN"))
.add(new DirectOperatorConversion(SqlStdOperatorTable.CONCAT, "concat"))
.add(new DirectOperatorConversion(SqlStdOperatorTable.EXP, "exp"))
.add(new DivOperatorConversion())
.add(new DirectOperatorConversion(SqlStdOperatorTable.DIVIDE_INTEGER, "div"))
.add(new DirectOperatorConversion(SqlStdOperatorTable.LN, "log"))
.add(new DirectOperatorConversion(SqlStdOperatorTable.LOWER, "lower"))

View File

@ -602,6 +602,38 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
);
}
@Test
public void testDiv()
{
cannotVectorize();
final Map<String, Object> context = new HashMap<>(QUERY_CONTEXT_DEFAULT);
testQuery(
"select cnt, m1, div(m1, 2), div(cnt+2, cnt+1) from foo",
context,
ImmutableList.of(
newScanQueryBuilder()
.dataSource(CalciteTests.DATASOURCE1)
.intervals(querySegmentSpec(Filtration.eternity()))
.virtualColumns(
expressionVirtualColumn("v0", "div(\"m1\",2)", ColumnType.LONG),
expressionVirtualColumn("v1", "div((\"cnt\" + 2),(\"cnt\" + 1))", ColumnType.LONG)
)
.columns(ImmutableList.of("cnt", "m1", "v0", "v1"))
.context(QUERY_CONTEXT_DEFAULT)
.build()
),
ImmutableList.of(
new Object[]{1L, 1.0f, 0L, 1L},
new Object[]{1L, 2.0f, 1L, 1L},
new Object[]{1L, 3.0f, 1L, 1L},
new Object[]{1L, 4.0f, 2L, 1L},
new Object[]{1L, 5.0f, 2L, 1L},
new Object[]{1L, 6.0f, 3L, 1L}
)
);
}
@Test
public void testGroupByLimitWrappingOrderByAgg()
{