Merge remote-tracking branch 'apache/master' into quidem-record

This commit is contained in:
Zoltan Haindrich 2024-07-18 05:48:13 +00:00
commit 06b68b6c89
128 changed files with 2066 additions and 2266 deletions

View File

@ -914,13 +914,10 @@ Host: http://ROUTER_IP:ROUTER_PORT
### Get task segments
:::info
This API is deprecated and will be removed in future releases.
This API is not supported anymore and always returns a 404 response.
Use the metric `segment/added/bytes` instead to identify the segment IDs committed by a task.
:::
Retrieves information about segments generated by the task given the task ID. To hit this endpoint, make sure to enable the audit log config on the Overlord with `druid.indexer.auditLog.enabled = true`.
In addition to enabling audit logs, configure a cleanup strategy to prevent overloading the metadata store with old audit logs which may cause performance issues. To enable automated cleanup of audit logs on the Coordinator, set `druid.coordinator.kill.audit.on`. You may also manually export the audit logs to external storage. For more information, see [Audit records](../operations/clean-metadata-store.md#audit-records).
#### URL
`GET` `/druid/indexer/v1/task/{taskId}/segments`
@ -929,12 +926,14 @@ In addition to enabling audit logs, configure a cleanup strategy to prevent over
<Tabs>
<TabItem value="27" label="200 SUCCESS">
<TabItem value="27" label="404 NOT FOUND">
<br/>
*Successfully retrieved task segments*
```json
{
"error": "Segment IDs committed by a task action are not persisted anymore. Use the metric 'segment/added/bytes' to identify the segments created by a task."
}
```
</TabItem>
</Tabs>

View File

@ -44,7 +44,7 @@ This applies to all metadata entities in this topic except compaction configurat
You can configure the retention period for each metadata type, when available, through the record's `durationToRetain` property.
Certain records may require additional conditions be satisfied before clean up occurs.
See the [example](#example) for how you can customize the automated metadata cleanup for a specific use case.
See the [example](#example-configuration-for-automated-metadata-cleanup) for how you can customize the automated metadata cleanup for a specific use case.
## Automated cleanup strategies
@ -62,13 +62,12 @@ You can configure cleanup for each entity separately, as described in this secti
Define the properties in the `coordinator/runtime.properties` file.
The cleanup of one entity may depend on the cleanup of another entity as follows:
- You have to configure a [kill task for segment records](#kill-task) before you can configure automated cleanup for [rules](#rules-records) or [compaction configuration](#compaction-configuration-records).
- You have to configure a [kill task for segment records](#segment-records-and-segments-in-deep-storage-kill-task) before you can configure automated cleanup for [rules](#rules-records) or [compaction configuration](#compaction-configuration-records).
- You have to schedule the metadata management tasks to run at the same or higher frequency as your most frequent cleanup job. For example, if your most frequent cleanup job is every hour, set the metadata store management period to one hour or less: `druid.coordinator.period.metadataStoreManagementPeriod=P1H`.
For details on configuration properties, see [Metadata management](../configuration/index.md#metadata-management).
If you want to skip the details, check out the [example](#example) for configuring automated metadata cleanup.
If you want to skip the details, check out the [example](#example-configuration-for-automated-metadata-cleanup) for configuring automated metadata cleanup.
<a name="kill-task"></a>
### Segment records and segments in deep storage (kill task)
:::info
@ -110,7 +109,7 @@ Supervisor cleanup uses the following configuration:
### Rules records
Rule records become eligible for deletion when all segments for the datasource have been killed by the kill task and the `durationToRetain` time has passed since their creation. Automated cleanup for rules requires a [kill task](#kill-task).
Rule records become eligible for deletion when all segments for the datasource have been killed by the kill task and the `durationToRetain` time has passed since their creation. Automated cleanup for rules requires a [kill task](#segment-records-and-segments-in-deep-storage-kill-task).
Rule cleanup uses the following configuration:
- `druid.coordinator.kill.rule.on`: When `true`, enables cleanup for rules records.
@ -129,7 +128,7 @@ To prevent the configuration from being prematurely removed, wait for the dataso
Unlike other metadata records, compaction configuration records do not have a retention period set by `durationToRetain`. Druid deletes compaction configuration records at every cleanup cycle for inactive datasources, which do not have segments either used or unused.
Compaction configuration records in the `druid_config` table become eligible for deletion after all segments for the datasource have been killed by the kill task. Automated cleanup for compaction configuration requires a [kill task](#kill-task).
Compaction configuration records in the `druid_config` table become eligible for deletion after all segments for the datasource have been killed by the kill task. Automated cleanup for compaction configuration requires a [kill task](#segment-records-and-segments-in-deep-storage-kill-task).
Compaction configuration cleanup uses the following configuration:
- `druid.coordinator.kill.compaction.on`: When `true`, enables cleanup for compaction configuration records.
@ -153,7 +152,7 @@ Datasource cleanup uses the following configuration:
You can configure the Overlord to periodically delete indexer task logs and associated metadata. During cleanup, the Overlord removes the following:
* Indexer task logs from deep storage.
* Indexer task log metadata from the tasks and tasklogs tables in [metadata storage](../configuration/index.md#metadata-storage) (named `druid_tasks` and `druid_tasklogs` by default). Druid no longer uses the tasklogs table, and the table is always empty.
* Indexer task log metadata from the tasks table in [metadata storage](../configuration/index.md#metadata-storage) (named `druid_tasks` by default).
To configure cleanup of task logs by the Overlord, set the following properties in the `overlord/runtime.properties` file.
@ -188,7 +187,6 @@ druid.coordinator.kill.rule.on=false
druid.coordinator.kill.datasource.on=false
```
<a name="example"></a>
## Example configuration for automated metadata cleanup
Consider a scenario where you have scripts to create and delete hundreds of datasources and related entities a day. You do not want to fill your metadata store with leftover records. The datasources and related entities tend to persist for only one or two days. Therefore, you want to run a cleanup job that identifies and removes leftover records that are at least four days old after a seven day buffer period in case you want to recover the data. The exception is for audit logs, which you need to retain for 30 days:

View File

@ -158,10 +158,12 @@
<excludes>
<!-- Initialization code -->
<exclude>org/apache/druid/k8s/discovery/K8sDiscoveryModule*</exclude>
<exclude>org/apache/druid/k8s/discovery/K8sDruidLeaderSelectorProvider*</exclude>
<!-- K8S Api Glue, not unit testable -->
<exclude>org/apache/druid/k8s/discovery/DefaultK8sApiClient*</exclude>
<exclude>org/apache/druid/k8s/discovery/DefaultK8sLeaderElectorFactory*</exclude>
</excludes>
</configuration>
</plugin>

View File

@ -21,9 +21,7 @@ package org.apache.druid.k8s.discovery;
import com.fasterxml.jackson.databind.Module;
import com.google.inject.Binder;
import com.google.inject.Inject;
import com.google.inject.Key;
import com.google.inject.Provider;
import io.kubernetes.client.openapi.ApiClient;
import io.kubernetes.client.util.Config;
import org.apache.druid.client.coordinator.Coordinator;
@ -34,9 +32,7 @@ import org.apache.druid.discovery.DruidNodeDiscoveryProvider;
import org.apache.druid.guice.JsonConfigProvider;
import org.apache.druid.guice.LazySingleton;
import org.apache.druid.guice.PolyBind;
import org.apache.druid.guice.annotations.Self;
import org.apache.druid.initialization.DruidModule;
import org.apache.druid.server.DruidNode;
import java.io.IOException;
import java.util.Collections;
@ -88,65 +84,15 @@ public class K8sDiscoveryModule implements DruidModule
PolyBind.optionBinder(binder, Key.get(DruidLeaderSelector.class, Coordinator.class))
.addBinding(K8S_KEY)
.toProvider(
new DruidLeaderSelectorProvider(true)
K8sDruidLeaderSelectorProvider.K8sCoordinatorDruidLeaderSelectorProvider.class
)
.in(LazySingleton.class);
PolyBind.optionBinder(binder, Key.get(DruidLeaderSelector.class, IndexingService.class))
.addBinding(K8S_KEY)
.toProvider(
new DruidLeaderSelectorProvider(false)
K8sDruidLeaderSelectorProvider.K8sIndexingServiceDruidLeaderSelectorProvider.class
)
.in(LazySingleton.class);
}
private static class DruidLeaderSelectorProvider implements Provider<DruidLeaderSelector>
{
@Inject
@Self
private DruidNode druidNode;
@Inject
private PodInfo podInfo;
@Inject
private K8sDiscoveryConfig discoveryConfig;
@Inject
private Provider<ApiClient> k8sApiClientProvider;
private boolean isCoordinator;
DruidLeaderSelectorProvider(boolean isCoordinator)
{
this.isCoordinator = isCoordinator;
}
@Override
public DruidLeaderSelector get()
{
// Note: these can not be setup in the constructor because injected K8sDiscoveryConfig and PodInfo
// are not available at that time.
String lockResourceName;
String lockResourceNamespace;
if (isCoordinator) {
lockResourceName = discoveryConfig.getClusterIdentifier() + "-leaderelection-coordinator";
lockResourceNamespace = discoveryConfig.getCoordinatorLeaderElectionConfigMapNamespace() == null ?
podInfo.getPodNamespace() : discoveryConfig.getCoordinatorLeaderElectionConfigMapNamespace();
} else {
lockResourceName = discoveryConfig.getClusterIdentifier() + "-leaderelection-overlord";
lockResourceNamespace = discoveryConfig.getOverlordLeaderElectionConfigMapNamespace() == null ?
podInfo.getPodNamespace() : discoveryConfig.getOverlordLeaderElectionConfigMapNamespace();
}
return new K8sDruidLeaderSelector(
druidNode,
lockResourceName,
lockResourceNamespace,
discoveryConfig,
new DefaultK8sLeaderElectorFactory(k8sApiClientProvider.get(), discoveryConfig)
);
}
}
}

View File

@ -0,0 +1,97 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.k8s.discovery;
import com.google.inject.Inject;
import com.google.inject.Provider;
import io.kubernetes.client.openapi.ApiClient;
import org.apache.druid.discovery.DruidLeaderSelector;
import org.apache.druid.guice.annotations.Self;
import org.apache.druid.server.DruidNode;
public abstract class K8sDruidLeaderSelectorProvider implements Provider<DruidLeaderSelector>
{
@Inject
@Self
private DruidNode druidNode;
@Inject
private PodInfo podInfo;
@Inject
private K8sDiscoveryConfig discoveryConfig;
@Inject
private Provider<ApiClient> k8sApiClientProvider;
private boolean isCoordinator;
K8sDruidLeaderSelectorProvider(boolean isCoordinator)
{
this.isCoordinator = isCoordinator;
}
@Override
public DruidLeaderSelector get()
{
// Note: these can not be setup in the constructor because injected K8sDiscoveryConfig and PodInfo
// are not available at that time.
String lockResourceName;
String lockResourceNamespace;
if (isCoordinator) {
lockResourceName = discoveryConfig.getClusterIdentifier() + "-leaderelection-coordinator";
lockResourceNamespace = discoveryConfig.getCoordinatorLeaderElectionConfigMapNamespace() == null
?
podInfo.getPodNamespace()
: discoveryConfig.getCoordinatorLeaderElectionConfigMapNamespace();
} else {
lockResourceName = discoveryConfig.getClusterIdentifier() + "-leaderelection-overlord";
lockResourceNamespace = discoveryConfig.getOverlordLeaderElectionConfigMapNamespace() == null ?
podInfo.getPodNamespace() : discoveryConfig.getOverlordLeaderElectionConfigMapNamespace();
}
return new K8sDruidLeaderSelector(
druidNode,
lockResourceName,
lockResourceNamespace,
discoveryConfig,
new DefaultK8sLeaderElectorFactory(k8sApiClientProvider.get(), discoveryConfig)
);
}
static class K8sCoordinatorDruidLeaderSelectorProvider extends K8sDruidLeaderSelectorProvider
{
@Inject
public K8sCoordinatorDruidLeaderSelectorProvider()
{
super(true);
}
}
static class K8sIndexingServiceDruidLeaderSelectorProvider extends K8sDruidLeaderSelectorProvider
{
@Inject
public K8sIndexingServiceDruidLeaderSelectorProvider()
{
super(false);
}
}
}

View File

@ -38,7 +38,6 @@ import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.msq.indexing.error.MSQException;
import org.apache.druid.msq.indexing.error.TooManyRowsInAWindowFault;
import org.apache.druid.query.groupby.ResultRow;
import org.apache.druid.query.operator.NaivePartitioningOperatorFactory;
import org.apache.druid.query.operator.OffsetLimit;
import org.apache.druid.query.operator.Operator;
import org.apache.druid.query.operator.OperatorFactory;
@ -70,6 +69,7 @@ public class WindowOperatorQueryFrameProcessor implements FrameProcessor<Object>
private final WindowOperatorQuery query;
private final List<OperatorFactory> operatorFactoryList;
private final List<String> partitionColumnNames;
private final ObjectMapper jsonMapper;
private final ArrayList<RowsAndColumns> frameRowsAndCols;
private final ArrayList<RowsAndColumns> resultRowAndCols;
@ -79,7 +79,6 @@ public class WindowOperatorQueryFrameProcessor implements FrameProcessor<Object>
private final FrameReader frameReader;
private final ArrayList<ResultRow> objectsOfASingleRac;
private final int maxRowsMaterialized;
List<Integer> partitionColsIndex;
private long currentAllocatorCapacity; // Used for generating FrameRowTooLargeException if needed
private Cursor frameCursor = null;
private Supplier<ResultRow> rowSupplierFromFrameCursor;
@ -97,7 +96,8 @@ public class WindowOperatorQueryFrameProcessor implements FrameProcessor<Object>
final List<OperatorFactory> operatorFactoryList,
final RowSignature rowSignature,
final boolean isOverEmpty,
final int maxRowsMaterializedInWindow
final int maxRowsMaterializedInWindow,
final List<String> partitionColumnNames
)
{
this.inputChannel = inputChannel;
@ -110,9 +110,9 @@ public class WindowOperatorQueryFrameProcessor implements FrameProcessor<Object>
this.frameRowsAndCols = new ArrayList<>();
this.resultRowAndCols = new ArrayList<>();
this.objectsOfASingleRac = new ArrayList<>();
this.partitionColsIndex = new ArrayList<>();
this.isOverEmpty = isOverEmpty;
this.maxRowsMaterialized = maxRowsMaterializedInWindow;
this.partitionColumnNames = partitionColumnNames;
}
@Override
@ -177,12 +177,12 @@ public class WindowOperatorQueryFrameProcessor implements FrameProcessor<Object>
*
* Future thoughts: {@link https://github.com/apache/druid/issues/16126}
*
* 1. We are writing 1 partition to each frame in this way. In case of low cardinality data
* we will me making a large number of small frames. We can have a check to keep size of frame to a value
* 1. We are writing 1 partition to each frame in this way. In case of high cardinality data
* we will be making a large number of small frames. We can have a check to keep size of frame to a value
* say 20k rows and keep on adding to the same pending frame and not create a new frame
*
* 2. Current approach with R&C and operators materialize a single R&C for processing. In case of data
* with high cardinality a single R&C might be too big to consume. Same for the case of empty OVER() clause
* with low cardinality a single R&C might be too big to consume. Same for the case of empty OVER() clause
* Most of the window operations like SUM(), RANK(), RANGE() etc. can be made with 2 passes of the data.
* We might think to reimplement them in the MSQ way so that we do not have to materialize so much data
*/
@ -218,7 +218,6 @@ public class WindowOperatorQueryFrameProcessor implements FrameProcessor<Object>
final Frame frame = inputChannel.read();
frameCursor = FrameProcessors.makeCursor(frame, frameReader);
final ColumnSelectorFactory frameColumnSelectorFactory = frameCursor.getColumnSelectorFactory();
partitionColsIndex = findPartitionColumns(frameReader.signature());
final Supplier<Object>[] fieldSuppliers = new Supplier[frameReader.signature().size()];
for (int i = 0; i < fieldSuppliers.length; i++) {
final ColumnValueSelector<?> selector =
@ -259,18 +258,17 @@ public class WindowOperatorQueryFrameProcessor implements FrameProcessor<Object>
if (outputRow == null) {
outputRow = currentRow;
objectsOfASingleRac.add(currentRow);
} else if (comparePartitionKeys(outputRow, currentRow, partitionColsIndex)) {
} else if (comparePartitionKeys(outputRow, currentRow, partitionColumnNames)) {
// if they have the same partition key
// keep adding them after checking
// guardrails
objectsOfASingleRac.add(currentRow);
if (objectsOfASingleRac.size() > maxRowsMaterialized) {
throw new MSQException(new TooManyRowsInAWindowFault(
objectsOfASingleRac.size(),
maxRowsMaterialized
));
}
objectsOfASingleRac.add(currentRow);
} else {
// key change noted
// create rac from the rows seen before
@ -484,37 +482,36 @@ public class WindowOperatorQueryFrameProcessor implements FrameProcessor<Object>
frameRowsAndCols.add(ldrc);
}
private List<Integer> findPartitionColumns(RowSignature rowSignature)
{
List<Integer> indexList = new ArrayList<>();
for (OperatorFactory of : operatorFactoryList) {
if (of instanceof NaivePartitioningOperatorFactory) {
for (String s : ((NaivePartitioningOperatorFactory) of).getPartitionColumns()) {
indexList.add(rowSignature.indexOf(s));
}
}
}
return indexList;
}
/**
*
* Compare two rows based only the columns in the partitionIndices
* In case the parition indices is empty or null compare entire row
*
* Compare two rows based on the columns in partitionColumnNames.
* If the partitionColumnNames is empty or null, compare entire row.
* <p>
* For example, say:
* <ul>
* <li>partitionColumnNames = ["d1", "d2"]</li>
* <li>frameReader's row signature = {d1:STRING, d2:STRING, p0:STRING}</li>
* <li>frameReader.signature.indexOf("d1") = 0</li>
* <li>frameReader.signature.indexOf("d2") = 1</li>
* <li>row1 = [d1_row1, d2_row1, p0_row1]</li>
* <li>row2 = [d1_row2, d2_row2, p0_row2]</li>
* </ul>
* <p>
* Then this method will return true if d1_row1==d1_row2 && d2_row1==d2_row2, false otherwise.
* Returning true would indicate that these 2 rows can be put into the same partition for window function processing.
*/
private boolean comparePartitionKeys(ResultRow row1, ResultRow row2, List<Integer> partitionIndices)
private boolean comparePartitionKeys(ResultRow row1, ResultRow row2, List<String> partitionColumnNames)
{
if (partitionIndices == null || partitionIndices.isEmpty()) {
if (partitionColumnNames == null || partitionColumnNames.isEmpty()) {
return row1.equals(row2);
} else {
int match = 0;
for (int i : partitionIndices) {
for (String columnName : partitionColumnNames) {
int i = frameReader.signature().indexOf(columnName);
if (Objects.equals(row1.get(i), row2.get(i))) {
match++;
}
}
return match == partitionIndices.size();
return match == partitionColumnNames.size();
}
}
}

View File

@ -61,6 +61,7 @@ public class WindowOperatorQueryFrameProcessorFactory extends BaseFrameProcessor
private final RowSignature stageRowSignature;
private final boolean isEmptyOver;
private final int maxRowsMaterializedInWindow;
private final List<String> partitionColumnNames;
@JsonCreator
public WindowOperatorQueryFrameProcessorFactory(
@ -68,7 +69,8 @@ public class WindowOperatorQueryFrameProcessorFactory extends BaseFrameProcessor
@JsonProperty("operatorList") List<OperatorFactory> operatorFactoryList,
@JsonProperty("stageRowSignature") RowSignature stageRowSignature,
@JsonProperty("emptyOver") boolean emptyOver,
@JsonProperty("maxRowsMaterializedInWindow") int maxRowsMaterializedInWindow
@JsonProperty("maxRowsMaterializedInWindow") int maxRowsMaterializedInWindow,
@JsonProperty("partitionColumnNames") List<String> partitionColumnNames
)
{
this.query = Preconditions.checkNotNull(query, "query");
@ -76,6 +78,7 @@ public class WindowOperatorQueryFrameProcessorFactory extends BaseFrameProcessor
this.stageRowSignature = Preconditions.checkNotNull(stageRowSignature, "stageSignature");
this.isEmptyOver = emptyOver;
this.maxRowsMaterializedInWindow = maxRowsMaterializedInWindow;
this.partitionColumnNames = partitionColumnNames;
}
@JsonProperty("query")
@ -90,6 +93,12 @@ public class WindowOperatorQueryFrameProcessorFactory extends BaseFrameProcessor
return operatorList;
}
@JsonProperty("partitionColumnNames")
public List<String> getPartitionColumnNames()
{
return partitionColumnNames;
}
@JsonProperty("stageRowSignature")
public RowSignature getSignature()
{
@ -148,7 +157,6 @@ public class WindowOperatorQueryFrameProcessorFactory extends BaseFrameProcessor
readableInput -> {
final OutputChannel outputChannel =
outputChannels.get(readableInput.getStagePartition().getPartitionNumber());
return new WindowOperatorQueryFrameProcessor(
query,
readableInput.getChannel(),
@ -159,7 +167,8 @@ public class WindowOperatorQueryFrameProcessorFactory extends BaseFrameProcessor
operatorList,
stageRowSignature,
isEmptyOver,
maxRowsMaterializedInWindow
maxRowsMaterializedInWindow,
partitionColumnNames
);
}
);
@ -185,12 +194,13 @@ public class WindowOperatorQueryFrameProcessorFactory extends BaseFrameProcessor
&& maxRowsMaterializedInWindow == that.maxRowsMaterializedInWindow
&& Objects.equals(query, that.query)
&& Objects.equals(operatorList, that.operatorList)
&& Objects.equals(partitionColumnNames, that.partitionColumnNames)
&& Objects.equals(stageRowSignature, that.stageRowSignature);
}
@Override
public int hashCode()
{
return Objects.hash(query, operatorList, stageRowSignature, isEmptyOver, maxRowsMaterializedInWindow);
return Objects.hash(query, operatorList, partitionColumnNames, stageRowSignature, isEmptyOver, maxRowsMaterializedInWindow);
}
}

View File

@ -24,9 +24,12 @@ import com.google.common.collect.ImmutableMap;
import org.apache.druid.frame.key.ClusterBy;
import org.apache.druid.frame.key.KeyColumn;
import org.apache.druid.frame.key.KeyOrder;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.msq.exec.Limits;
import org.apache.druid.msq.input.stage.StageInputSpec;
import org.apache.druid.msq.kernel.HashShuffleSpec;
import org.apache.druid.msq.kernel.MixShuffleSpec;
import org.apache.druid.msq.kernel.QueryDefinition;
import org.apache.druid.msq.kernel.QueryDefinitionBuilder;
import org.apache.druid.msq.kernel.ShuffleSpec;
@ -39,6 +42,7 @@ import org.apache.druid.query.operator.NaiveSortOperatorFactory;
import org.apache.druid.query.operator.OperatorFactory;
import org.apache.druid.query.operator.WindowOperatorQuery;
import org.apache.druid.query.operator.window.WindowOperatorFactory;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
import java.util.ArrayList;
@ -48,6 +52,7 @@ import java.util.Map;
public class WindowOperatorQueryKit implements QueryKit<WindowOperatorQuery>
{
private static final Logger log = new Logger(WindowOperatorQueryKit.class);
private final ObjectMapper jsonMapper;
public WindowOperatorQueryKit(ObjectMapper jsonMapper)
@ -65,13 +70,22 @@ public class WindowOperatorQueryKit implements QueryKit<WindowOperatorQuery>
int minStageNumber
)
{
// need to validate query first
// populate the group of operators to be processed as each stage
// the size of the operators is the number of serialized stages
// later we should also check if these can be parallelized
// check there is an empty over clause or not
List<List<OperatorFactory>> operatorList = new ArrayList<>();
boolean isEmptyOverFound = ifEmptyOverPresentInWindowOperstors(originalQuery, operatorList);
// Need to validate query first.
// Populate the group of operators to be processed at each stage.
// The size of the operators is the number of serialized stages.
// Later we should also check if these can be parallelized.
// Check if there is an empty OVER() clause or not.
RowSignature rowSignature = originalQuery.getRowSignature();
log.info("Row signature received for query is [%s].", rowSignature);
boolean isEmptyOverPresent = originalQuery.getOperators()
.stream()
.filter(of -> of instanceof NaivePartitioningOperatorFactory)
.map(of -> (NaivePartitioningOperatorFactory) of)
.anyMatch(of -> of.getPartitionColumns().isEmpty());
List<List<OperatorFactory>> operatorList = getOperatorListFromQuery(originalQuery);
log.info("Created operatorList with operator factories: [%s]", operatorList);
ShuffleSpec nextShuffleSpec = findShuffleSpecForNextWindow(operatorList.get(0), maxWorkerCount);
// add this shuffle spec to the last stage of the inner query
@ -102,16 +116,14 @@ public class WindowOperatorQueryKit implements QueryKit<WindowOperatorQuery>
final int firstStageNumber = Math.max(minStageNumber, queryDefBuilder.getNextStageNumber());
final WindowOperatorQuery queryToRun = (WindowOperatorQuery) originalQuery.withDataSource(dataSourcePlan.getNewDataSource());
final int maxRowsMaterialized;
RowSignature rowSignature = queryToRun.getRowSignature();
if (originalQuery.context() != null && originalQuery.context().containsKey(MultiStageQueryContext.MAX_ROWS_MATERIALIZED_IN_WINDOW)) {
maxRowsMaterialized = (int) originalQuery.context()
.get(MultiStageQueryContext.MAX_ROWS_MATERIALIZED_IN_WINDOW);
maxRowsMaterialized = (int) originalQuery.context().get(MultiStageQueryContext.MAX_ROWS_MATERIALIZED_IN_WINDOW);
} else {
maxRowsMaterialized = Limits.MAX_ROWS_MATERIALIZED_IN_WINDOW;
}
if (isEmptyOverFound) {
if (isEmptyOverPresent) {
// empty over clause found
// moving everything to a single partition
queryDefBuilder.add(
@ -125,28 +137,59 @@ public class WindowOperatorQueryKit implements QueryKit<WindowOperatorQuery>
queryToRun.getOperators(),
rowSignature,
true,
maxRowsMaterialized
maxRowsMaterialized,
new ArrayList<>()
))
);
} else {
// there are multiple windows present in the query
// Create stages for each window in the query
// These stages will be serialized
// the partition by clause of the next window will be the shuffle key for the previous window
// There are multiple windows present in the query.
// Create stages for each window in the query.
// These stages will be serialized.
// The partition by clause of the next window will be the shuffle key for the previous window.
RowSignature.Builder bob = RowSignature.builder();
final int numberOfWindows = operatorList.size();
final int baseSize = rowSignature.size() - numberOfWindows;
for (int i = 0; i < baseSize; i++) {
bob.add(rowSignature.getColumnName(i), rowSignature.getColumnType(i).get());
RowSignature signatureFromInput = dataSourcePlan.getSubQueryDefBuilder().get().build().getFinalStageDefinition().getSignature();
log.info("Row signature received from last stage is [%s].", signatureFromInput);
for (int i = 0; i < signatureFromInput.getColumnNames().size(); i++) {
bob.add(signatureFromInput.getColumnName(i), signatureFromInput.getColumnType(i).get());
}
List<String> partitionColumnNames = new ArrayList<>();
/*
operatorList is a List<List<OperatorFactory>>, where each List<OperatorFactory> corresponds to the operator factories
to be used for a different window stage.
We iterate over operatorList, and add the definition for a window stage to QueryDefinitionBuilder.
*/
for (int i = 0; i < operatorList.size(); i++) {
for (OperatorFactory operatorFactory : operatorList.get(i)) {
if (operatorFactory instanceof WindowOperatorFactory) {
List<String> outputColumnNames = ((WindowOperatorFactory) operatorFactory).getProcessor().getOutputColumnNames();
// Need to add column names which are present in outputColumnNames and rowSignature but not in bob,
// since they need to be present in the row signature for this window stage.
for (String columnName : outputColumnNames) {
int indexInRowSignature = rowSignature.indexOf(columnName);
if (indexInRowSignature != -1 && bob.build().indexOf(columnName) == -1) {
ColumnType columnType = rowSignature.getColumnType(indexInRowSignature).get();
bob.add(columnName, columnType);
log.info("Added column [%s] of type [%s] to row signature for window stage.", columnName, columnType);
} else {
throw new ISE(
"Found unexpected column [%s] already present in row signature [%s].",
columnName,
rowSignature
);
}
}
}
}
for (int i = 0; i < numberOfWindows; i++) {
bob.add(rowSignature.getColumnName(baseSize + i), rowSignature.getColumnType(baseSize + i).get()).build();
// find the shuffle spec of the next stage
// if it is the last stage set the next shuffle spec to single partition
if (i + 1 == numberOfWindows) {
nextShuffleSpec = ShuffleSpecFactories.singlePartition()
.build(ClusterBy.none(), false);
if (i + 1 == operatorList.size()) {
nextShuffleSpec = MixShuffleSpec.instance();
} else {
nextShuffleSpec = findShuffleSpecForNextWindow(operatorList.get(i + 1), maxWorkerCount);
}
@ -162,6 +205,28 @@ public class WindowOperatorQueryKit implements QueryKit<WindowOperatorQuery>
);
}
log.info("Using row signature [%s] for window stage.", stageRowSignature);
boolean partitionOperatorExists = false;
List<String> currentPartitionColumns = new ArrayList<>();
for (OperatorFactory of : operatorList.get(i)) {
if (of instanceof NaivePartitioningOperatorFactory) {
for (String s : ((NaivePartitioningOperatorFactory) of).getPartitionColumns()) {
currentPartitionColumns.add(s);
partitionOperatorExists = true;
}
}
}
if (partitionOperatorExists) {
partitionColumnNames = currentPartitionColumns;
}
log.info(
"Columns which would be used to define partitioning boundaries for this window stage are [%s]",
partitionColumnNames
);
queryDefBuilder.add(
StageDefinition.builder(firstStageNumber + i)
.inputs(new StageInputSpec(firstStageNumber + i - 1))
@ -173,7 +238,8 @@ public class WindowOperatorQueryKit implements QueryKit<WindowOperatorQuery>
operatorList.get(i),
stageRowSignature,
false,
maxRowsMaterialized
maxRowsMaterialized,
partitionColumnNames
))
);
}
@ -184,14 +250,12 @@ public class WindowOperatorQueryKit implements QueryKit<WindowOperatorQuery>
/**
*
* @param originalQuery
* @param operatorList
* @return true if the operator List has a partitioning operator with an empty OVER clause, false otherwise
* @return A list of list of operator factories, where each list represents the operator factories for a particular
* window stage.
*/
private boolean ifEmptyOverPresentInWindowOperstors(
WindowOperatorQuery originalQuery,
List<List<OperatorFactory>> operatorList
)
private List<List<OperatorFactory>> getOperatorListFromQuery(WindowOperatorQuery originalQuery)
{
List<List<OperatorFactory>> operatorList = new ArrayList<>();
final List<OperatorFactory> operators = originalQuery.getOperators();
List<OperatorFactory> operatorFactoryList = new ArrayList<>();
for (OperatorFactory of : operators) {
@ -203,18 +267,17 @@ public class WindowOperatorQueryKit implements QueryKit<WindowOperatorQuery>
if (((NaivePartitioningOperatorFactory) of).getPartitionColumns().isEmpty()) {
operatorList.clear();
operatorList.add(originalQuery.getOperators());
return true;
return operatorList;
}
}
}
return false;
return operatorList;
}
private ShuffleSpec findShuffleSpecForNextWindow(List<OperatorFactory> operatorFactories, int maxWorkerCount)
{
NaivePartitioningOperatorFactory partition = null;
NaiveSortOperatorFactory sort = null;
List<KeyColumn> keyColsOfWindow = new ArrayList<>();
for (OperatorFactory of : operatorFactories) {
if (of instanceof NaivePartitioningOperatorFactory) {
partition = (NaivePartitioningOperatorFactory) of;
@ -222,29 +285,31 @@ public class WindowOperatorQueryKit implements QueryKit<WindowOperatorQuery>
sort = (NaiveSortOperatorFactory) of;
}
}
Map<String, ColumnWithDirection.Direction> colMap = new HashMap<>();
Map<String, ColumnWithDirection.Direction> sortColumnsMap = new HashMap<>();
if (sort != null) {
for (ColumnWithDirection sortColumn : sort.getSortColumns()) {
colMap.put(sortColumn.getColumn(), sortColumn.getDirection());
sortColumnsMap.put(sortColumn.getColumn(), sortColumn.getDirection());
}
}
assert partition != null;
if (partition.getPartitionColumns().isEmpty()) {
if (partition == null || partition.getPartitionColumns().isEmpty()) {
// If operatorFactories doesn't have any partitioning factory, then we should keep the shuffle spec from previous stage.
// This indicates that we already have the data partitioned correctly, and hence we don't need to do any shuffling.
return null;
}
List<KeyColumn> keyColsOfWindow = new ArrayList<>();
for (String partitionColumn : partition.getPartitionColumns()) {
KeyColumn kc;
if (colMap.containsKey(partitionColumn)) {
if (colMap.get(partitionColumn) == ColumnWithDirection.Direction.ASC) {
kc = new KeyColumn(partitionColumn, KeyOrder.ASCENDING);
} else {
if (sortColumnsMap.get(partitionColumn) == ColumnWithDirection.Direction.DESC) {
kc = new KeyColumn(partitionColumn, KeyOrder.DESCENDING);
}
} else {
kc = new KeyColumn(partitionColumn, KeyOrder.ASCENDING);
}
keyColsOfWindow.add(kc);
}
return new HashShuffleSpec(new ClusterBy(keyColsOfWindow, 0), maxWorkerCount);
}
}

View File

@ -17,30 +17,19 @@
* under the License.
*/
package org.apache.druid.indexing.common.actions;
package org.apache.druid.msq.querykit;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import nl.jqno.equalsverifier.EqualsVerifier;
import org.junit.Test;
/**
* The configuration for task audit logging.
* This class will be removed in future releases. See https://github.com/apache/druid/issues/5859.
*/
@Deprecated
public class TaskAuditLogConfig
public class WindowOperatorQueryFrameProcessorFactoryTest
{
@JsonProperty
private final boolean enabled;
@JsonCreator
public TaskAuditLogConfig(@JsonProperty("enabled") boolean enabled)
@Test
public void testEqualsAndHashcode()
{
this.enabled = enabled;
}
@JsonProperty("enabled")
public boolean isEnabled()
{
return enabled;
EqualsVerifier.forClass(WindowOperatorQueryFrameProcessorFactory.class)
.withNonnullFields("query", "operatorList", "stageRowSignature", "isEmptyOver", "maxRowsMaterializedInWindow", "partitionColumnNames")
.usingGetClass()
.verify();
}
}

View File

@ -39,6 +39,7 @@ import org.apache.druid.guice.JoinableFactoryModule;
import org.apache.druid.guice.annotations.Self;
import org.apache.druid.indexing.common.SegmentCacheManagerFactory;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.math.expr.ExprMacroTable;
@ -62,12 +63,14 @@ import org.apache.druid.query.groupby.GroupingEngine;
import org.apache.druid.query.groupby.TestGroupByBuffers;
import org.apache.druid.segment.IndexBuilder;
import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.QueryableIndexStorageAdapter;
import org.apache.druid.segment.Segment;
import org.apache.druid.segment.StorageAdapter;
import org.apache.druid.segment.TestIndex;
import org.apache.druid.segment.column.ColumnConfig;
import org.apache.druid.segment.incremental.IncrementalIndex;
import org.apache.druid.segment.incremental.IncrementalIndexSchema;
import org.apache.druid.segment.loading.DataSegmentPusher;
import org.apache.druid.segment.loading.LocalDataSegmentPusher;
@ -91,6 +94,7 @@ import javax.annotation.Nullable;
import java.io.File;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.function.Function;
import java.util.function.Supplier;
@ -99,6 +103,7 @@ import static org.apache.druid.sql.calcite.util.CalciteTests.DATASOURCE1;
import static org.apache.druid.sql.calcite.util.CalciteTests.DATASOURCE2;
import static org.apache.druid.sql.calcite.util.CalciteTests.DATASOURCE3;
import static org.apache.druid.sql.calcite.util.CalciteTests.DATASOURCE5;
import static org.apache.druid.sql.calcite.util.CalciteTests.WIKIPEDIA;
import static org.apache.druid.sql.calcite.util.TestDataBuilder.INDEX_SCHEMA_LOTS_O_COLUMNS;
import static org.apache.druid.sql.calcite.util.TestDataBuilder.INDEX_SCHEMA_NUMERIC_DIMS;
import static org.apache.druid.sql.calcite.util.TestDataBuilder.ROWS1;
@ -205,6 +210,17 @@ public class CalciteMSQTestsHelper
{
final QueryableIndex index;
switch (segmentId.getDataSource()) {
case WIKIPEDIA:
try {
final File directory = new File(tempFolderProducer.apply("tmpDir"), StringUtils.format("wikipedia-index-%s", UUID.randomUUID()));
final IncrementalIndex incrementalIndex = TestIndex.makeWikipediaIncrementalIndex();
TestIndex.INDEX_MERGER.persist(incrementalIndex, directory, IndexSpec.DEFAULT, null);
index = TestIndex.INDEX_IO.loadIndex(directory);
}
catch (Exception e) {
throw new RuntimeException(e);
}
break;
case DATASOURCE1:
IncrementalIndexSchema foo1Schema = new IncrementalIndexSchema.Builder()
.withMetrics(

View File

@ -121,12 +121,6 @@ public class CheckPointDataSourceMetadataAction implements TaskAction<Boolean>
);
}
@Override
public boolean isAudited()
{
return true;
}
@Override
public String toString()
{

View File

@ -22,8 +22,6 @@ package org.apache.druid.indexing.common.actions;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.druid.indexing.common.task.IndexTaskUtils;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.TaskStorage;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.jackson.JacksonUtils;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
@ -37,45 +35,21 @@ public class LocalTaskActionClient implements TaskActionClient
private static final EmittingLogger log = new EmittingLogger(LocalTaskActionClient.class);
private final Task task;
private final TaskStorage storage;
private final TaskActionToolbox toolbox;
private final TaskAuditLogConfig auditLogConfig;
public LocalTaskActionClient(
Task task,
TaskStorage storage,
TaskActionToolbox toolbox,
TaskAuditLogConfig auditLogConfig
TaskActionToolbox toolbox
)
{
this.task = task;
this.storage = storage;
this.toolbox = toolbox;
this.auditLogConfig = auditLogConfig;
}
@Override
public <RetType> RetType submit(TaskAction<RetType> taskAction)
{
log.debug("Performing action for task[%s]: %s", task.getId(), taskAction);
if (auditLogConfig.isEnabled() && taskAction.isAudited()) {
// Add audit log
try {
final long auditLogStartTime = System.currentTimeMillis();
storage.addAuditLog(task, taskAction);
emitTimerMetric("task/action/log/time", taskAction, System.currentTimeMillis() - auditLogStartTime);
}
catch (Exception e) {
final String actionClass = taskAction.getClass().getName();
log.makeAlert(e, "Failed to record action in audit log")
.addData("task", task.getId())
.addData("actionClass", actionClass)
.emit();
throw new ISE(e, "Failed to record action [%s] in audit log", actionClass);
}
}
final long performStartTime = System.currentTimeMillis();
final RetType result = performAction(taskAction);
emitTimerMetric("task/action/run/time", taskAction, System.currentTimeMillis() - performStartTime);

View File

@ -21,27 +21,22 @@ package org.apache.druid.indexing.common.actions;
import com.google.inject.Inject;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.TaskStorage;
/**
*/
public class LocalTaskActionClientFactory implements TaskActionClientFactory
{
private final TaskStorage storage;
private final TaskActionToolbox toolbox;
private final TaskAuditLogConfig auditLogConfig;
@Inject
public LocalTaskActionClientFactory(TaskStorage storage, TaskActionToolbox toolbox, TaskAuditLogConfig auditLogConfig)
public LocalTaskActionClientFactory(TaskActionToolbox toolbox)
{
this.storage = storage;
this.toolbox = toolbox;
this.auditLogConfig = auditLogConfig;
}
@Override
public TaskActionClient create(Task task)
{
return new LocalTaskActionClient(task, storage, toolbox, auditLogConfig);
return new LocalTaskActionClient(task, toolbox);
}
}

View File

@ -39,12 +39,6 @@ public class LockListAction implements TaskAction<List<TaskLock>>
return toolbox.getTaskLockbox().findLocksForTask(task);
}
@Override
public boolean isAudited()
{
return false;
}
@Override
public String toString()
{

View File

@ -56,12 +56,6 @@ public class LockReleaseAction implements TaskAction<Void>
return null;
}
@Override
public boolean isAudited()
{
return false;
}
@Override
public String toString()
{

View File

@ -67,9 +67,4 @@ public class MarkSegmentsAsUnusedAction implements TaskAction<Integer>
.markSegmentsAsUnusedWithinInterval(dataSource, interval);
}
@Override
public boolean isAudited()
{
return true;
}
}

View File

@ -64,12 +64,6 @@ public class ResetDataSourceMetadataAction implements TaskAction<Boolean>
return toolbox.getSupervisorManager().resetSupervisor(dataSource, resetMetadata);
}
@Override
public boolean isAudited()
{
return true;
}
@Override
public String toString()
{

View File

@ -74,12 +74,6 @@ public class RetrieveSegmentsByIdAction implements TaskAction<Set<DataSegment>>
.retrieveSegmentsById(dataSource, segmentIds);
}
@Override
public boolean isAudited()
{
return false;
}
@Override
public boolean equals(Object o)
{

View File

@ -101,12 +101,6 @@ public class RetrieveUnusedSegmentsAction implements TaskAction<List<DataSegment
.retrieveUnusedSegmentsForInterval(dataSource, interval, versions, limit, maxUsedStatusLastUpdatedTime);
}
@Override
public boolean isAudited()
{
return false;
}
@Override
public String toString()
{

View File

@ -73,12 +73,6 @@ public class RetrieveUpgradedFromSegmentIdsAction implements TaskAction<Upgraded
);
}
@Override
public boolean isAudited()
{
return false;
}
@Override
public String toString()
{

View File

@ -78,12 +78,6 @@ public class RetrieveUpgradedToSegmentIdsAction implements TaskAction<UpgradedTo
);
}
@Override
public boolean isAudited()
{
return false;
}
@Override
public String toString()
{

View File

@ -186,12 +186,6 @@ public class RetrieveUsedSegmentsAction implements TaskAction<Collection<DataSeg
.retrieveUsedSegmentsForIntervals(dataSource, intervals, visibility);
}
@Override
public boolean isAudited()
{
return false;
}
@Override
public boolean equals(Object o)
{

View File

@ -386,12 +386,6 @@ public class SegmentAllocateAction implements TaskAction<SegmentIdWithShardSpec>
}
}
@Override
public boolean isAudited()
{
return false;
}
@Override
public String toString()
{

View File

@ -1,101 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.common.actions;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.segment.SegmentSchemaMapping;
import org.apache.druid.segment.SegmentUtils;
import org.apache.druid.timeline.DataSegment;
import javax.annotation.Nullable;
import java.util.Set;
/**
* Word of warning: Very large "segments" sets can cause oversized audit log entries, which is bad because it means
* that the task cannot actually complete. Callers should avoid this by avoiding inserting too many segments in the
* same action.
*/
public class SegmentInsertAction implements TaskAction<Set<DataSegment>>
{
private final Set<DataSegment> segments;
@Nullable
private final SegmentSchemaMapping segmentSchemaMapping;
@JsonCreator
public SegmentInsertAction(
@JsonProperty("segments") Set<DataSegment> segments,
@JsonProperty("segmentSchemaMapping") @Nullable SegmentSchemaMapping segmentSchemaMapping
)
{
this.segments = ImmutableSet.copyOf(segments);
this.segmentSchemaMapping = segmentSchemaMapping;
}
@JsonProperty
public Set<DataSegment> getSegments()
{
return segments;
}
@JsonProperty
@Nullable
public SegmentSchemaMapping getSegmentSchemaMapping()
{
return segmentSchemaMapping;
}
@Override
public TypeReference<Set<DataSegment>> getReturnTypeReference()
{
return new TypeReference<Set<DataSegment>>()
{
};
}
/**
* Behaves similarly to
* {@link org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator#commitSegments},
* with startMetadata and endMetadata both null.
*/
@Override
public Set<DataSegment> perform(Task task, TaskActionToolbox toolbox)
{
return SegmentTransactionalInsertAction.appendAction(segments, null, null, segmentSchemaMapping).perform(task, toolbox).getSegments();
}
@Override
public boolean isAudited()
{
return true;
}
@Override
public String toString()
{
return "SegmentInsertAction{" +
"segments=" + SegmentUtils.commaSeparatedIdentifiers(segments) +
'}';
}
}

View File

@ -119,12 +119,6 @@ public class SegmentLockAcquireAction implements TaskAction<LockResult>
}
}
@Override
public boolean isAudited()
{
return false;
}
@Override
public String toString()
{

View File

@ -102,12 +102,6 @@ public class SegmentLockTryAcquireAction implements TaskAction<List<LockResult>>
.collect(Collectors.toList());
}
@Override
public boolean isAudited()
{
return false;
}
@Override
public String toString()
{

View File

@ -99,12 +99,6 @@ public class SegmentMetadataUpdateAction implements TaskAction<Void>
return null;
}
@Override
public boolean isAudited()
{
return true;
}
@Override
public String toString()
{

View File

@ -101,12 +101,6 @@ public class SegmentNukeAction implements TaskAction<Void>
return null;
}
@Override
public boolean isAudited()
{
return true;
}
@Override
public String toString()
{

View File

@ -44,7 +44,6 @@ import java.util.Set;
import java.util.stream.Collectors;
/**
*
* Append segments to metadata storage. The segment versions must all be less than or equal to a lock held by
* your task for the segment intervals.
*
@ -209,12 +208,6 @@ public class SegmentTransactionalAppendAction implements TaskAction<SegmentPubli
return retVal;
}
@Override
public boolean isAudited()
{
return true;
}
@Override
public String toString()
{

View File

@ -304,12 +304,6 @@ public class SegmentTransactionalInsertAction implements TaskAction<SegmentPubli
return segmentsMap;
}
@Override
public boolean isAudited()
{
return true;
}
@Override
public String toString()
{

View File

@ -187,12 +187,6 @@ public class SegmentTransactionalReplaceAction implements TaskAction<SegmentPubl
);
}
@Override
public boolean isAudited()
{
return true;
}
@Override
public String toString()
{

View File

@ -73,12 +73,6 @@ public class SurrogateAction<ReturnType, ActionType extends TaskAction<ReturnTyp
}
}
@Override
public boolean isAudited()
{
return taskAction.isAudited();
}
@Override
public String toString()
{

View File

@ -34,7 +34,6 @@ import java.util.concurrent.Future;
@JsonSubTypes.Type(name = "segmentLockAcquire", value = SegmentLockAcquireAction.class),
@JsonSubTypes.Type(name = "lockList", value = LockListAction.class),
@JsonSubTypes.Type(name = "lockRelease", value = LockReleaseAction.class),
@JsonSubTypes.Type(name = "segmentInsertion", value = SegmentInsertAction.class),
@JsonSubTypes.Type(name = "segmentTransactionalInsert", value = SegmentTransactionalInsertAction.class),
@JsonSubTypes.Type(name = "segmentTransactionalAppend", value = SegmentTransactionalAppendAction.class),
@JsonSubTypes.Type(name = "segmentTransactionalReplace", value = SegmentTransactionalReplaceAction.class),
@ -61,8 +60,6 @@ public interface TaskAction<RetType>
RetType perform(Task task, TaskActionToolbox toolbox);
boolean isAudited();
default boolean canPerformAsync(Task task, TaskActionToolbox toolbox)
{
return false;

View File

@ -97,12 +97,6 @@ public class TimeChunkLockAcquireAction implements TaskAction<TaskLock>
}
}
@Override
public boolean isAudited()
{
return false;
}
@Override
public String toString()
{

View File

@ -81,12 +81,6 @@ public class TimeChunkLockTryAcquireAction implements TaskAction<TaskLock>
return result.getTaskLock();
}
@Override
public boolean isAudited()
{
return false;
}
@Override
public String toString()
{

View File

@ -63,12 +63,6 @@ public class UpdateLocationAction implements TaskAction<Void>
return null;
}
@Override
public boolean isAudited()
{
return true;
}
@Override
public String toString()
{

View File

@ -85,12 +85,6 @@ public class UpdateStatusAction implements TaskAction<Void>
return null;
}
@Override
public boolean isAudited()
{
return true;
}
@Override
public String toString()
{

View File

@ -327,24 +327,6 @@ public class HeapMemoryTaskStorage implements TaskStorage
}
}
@Deprecated
@Override
public <T> void addAuditLog(Task task, TaskAction<T> taskAction)
{
synchronized (taskActions) {
taskActions.put(task.getId(), taskAction);
}
}
@Deprecated
@Override
public List<TaskAction> getAuditLogs(String taskid)
{
synchronized (taskActions) {
return ImmutableList.copyOf(taskActions.get(taskid));
}
}
private static class TaskStuff
{
final Task task;

View File

@ -25,6 +25,7 @@ import org.apache.druid.indexer.TaskInfo;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.metadata.TaskLookup;
import org.joda.time.Interval;
import java.util.Comparator;
@ -32,16 +33,16 @@ import java.util.Optional;
public class IndexerMetadataStorageAdapter
{
private final TaskStorageQueryAdapter taskStorageQueryAdapter;
private final TaskStorage taskStorage;
private final IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator;
@Inject
public IndexerMetadataStorageAdapter(
TaskStorageQueryAdapter taskStorageQueryAdapter,
TaskStorage taskStorage,
IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator
)
{
this.taskStorageQueryAdapter = taskStorageQueryAdapter;
this.taskStorage = taskStorage;
this.indexerMetadataStorageCoordinator = indexerMetadataStorageCoordinator;
}
@ -49,8 +50,8 @@ public class IndexerMetadataStorageAdapter
{
// Find the earliest active task created for the specified datasource; if one exists,
// check if its interval overlaps with the delete interval.
final Optional<TaskInfo<Task, TaskStatus>> earliestActiveTaskOptional = taskStorageQueryAdapter
.getActiveTaskInfo(dataSource)
final Optional<TaskInfo<Task, TaskStatus>> earliestActiveTaskOptional = taskStorage
.getTaskInfos(TaskLookup.activeTasksOnly(), dataSource)
.stream()
.min(Comparator.comparing(TaskInfo::getCreatedTime));

View File

@ -75,14 +75,6 @@ public class MetadataTaskStorage implements TaskStorage
};
}
@Override
public TypeReference<TaskAction> getLogType()
{
return new TypeReference<TaskAction>()
{
};
}
@Override
public TypeReference<TaskLock> getLockType()
{
@ -319,24 +311,6 @@ public class MetadataTaskStorage implements TaskStorage
);
}
@Deprecated
@Override
public <T> void addAuditLog(final Task task, final TaskAction<T> taskAction)
{
Preconditions.checkNotNull(taskAction, "taskAction");
log.info("Logging action for task[%s]: %s", task.getId(), taskAction);
handler.addLog(task.getId(), taskAction);
}
@Deprecated
@Override
public List<TaskAction> getAuditLogs(final String taskId)
{
return handler.getLogs(taskId);
}
private Map<Long, TaskLock> getLocksWithIds(final String taskid)
{
return handler.getLocks(taskid);

View File

@ -24,43 +24,34 @@ import com.google.inject.Inject;
import org.apache.druid.indexer.TaskInfo;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexer.TaskStatusPlus;
import org.apache.druid.indexing.common.actions.SegmentInsertAction;
import org.apache.druid.indexing.common.actions.SegmentTransactionalInsertAction;
import org.apache.druid.indexing.common.actions.TaskAction;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.metadata.LockFilterPolicy;
import org.apache.druid.metadata.TaskLookup;
import org.apache.druid.metadata.TaskLookup.ActiveTaskLookup;
import org.apache.druid.metadata.TaskLookup.TaskLookupType;
import org.apache.druid.timeline.DataSegment;
import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
* Wraps a {@link TaskStorage}, providing a useful collection of read-only methods.
* Provides read-only methods to fetch information related to tasks.
* This class may serve information that is cached in memory in {@link TaskQueue}
* or {@link TaskLockbox}. If not present in memory, then the underlying
* {@link TaskStorage} is queried.
*/
public class TaskStorageQueryAdapter
public class TaskQueryTool
{
private final TaskStorage storage;
private final TaskLockbox taskLockbox;
private final Optional<TaskQueue> taskQueue;
private final TaskMaster taskMaster;
@Inject
public TaskStorageQueryAdapter(TaskStorage storage, TaskLockbox taskLockbox, TaskMaster taskMaster)
public TaskQueryTool(TaskStorage storage, TaskLockbox taskLockbox, TaskMaster taskMaster)
{
this.storage = storage;
this.taskLockbox = taskLockbox;
this.taskQueue = taskMaster.getTaskQueue();
}
public List<Task> getActiveTasks()
{
return storage.getActiveTasks();
this.taskMaster = taskMaster;
}
/**
@ -91,7 +82,7 @@ public class TaskStorageQueryAdapter
public List<TaskInfo<Task, TaskStatus>> getActiveTaskInfo(@Nullable String dataSource)
{
return storage.getTaskInfos(
ActiveTaskLookup.getInstance(),
TaskLookup.activeTasksOnly(),
dataSource
);
}
@ -104,20 +95,21 @@ public class TaskStorageQueryAdapter
return storage.getTaskStatusPlusList(taskLookups, dataSource);
}
public Optional<Task> getTask(final String taskid)
public Optional<Task> getTask(final String taskId)
{
final Optional<TaskQueue> taskQueue = taskMaster.getTaskQueue();
if (taskQueue.isPresent()) {
Optional<Task> activeTask = taskQueue.get().getActiveTask(taskid);
Optional<Task> activeTask = taskQueue.get().getActiveTask(taskId);
if (activeTask.isPresent()) {
return activeTask;
}
}
return storage.getTask(taskid);
return storage.getTask(taskId);
}
public Optional<TaskStatus> getStatus(final String taskid)
public Optional<TaskStatus> getStatus(final String taskId)
{
return storage.getStatus(taskid);
return storage.getStatus(taskId);
}
@Nullable
@ -126,27 +118,4 @@ public class TaskStorageQueryAdapter
return storage.getTaskInfo(taskId);
}
/**
* Returns all segments created by this task.
*
* This method is useful when you want to figure out all of the things a single task spawned. It does pose issues
* with the result set perhaps growing boundlessly and we do not do anything to protect against that. Use at your
* own risk and know that at some point, we might adjust this to actually enforce some sort of limits.
*
* @param taskid task ID
* @return set of segments created by the specified task
*/
@Deprecated
public Set<DataSegment> getInsertedSegments(final String taskid)
{
final Set<DataSegment> segments = new HashSet<>();
for (final TaskAction action : storage.getAuditLogs(taskid)) {
if (action instanceof SegmentInsertAction) {
segments.addAll(((SegmentInsertAction) action).getSegments());
} else if (action instanceof SegmentTransactionalInsertAction) {
segments.addAll(((SegmentTransactionalInsertAction) action).getSegments());
}
}
return segments;
}
}

View File

@ -24,7 +24,6 @@ import org.apache.druid.indexer.TaskInfo;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexer.TaskStatusPlus;
import org.apache.druid.indexing.common.TaskLock;
import org.apache.druid.indexing.common.actions.TaskAction;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.metadata.TaskLookup;
import org.apache.druid.metadata.TaskLookup.TaskLookupType;
@ -110,26 +109,6 @@ public interface TaskStorage
@Nullable
TaskInfo<Task, TaskStatus> getTaskInfo(String taskId);
/**
* Add an action taken by a task to the audit log.
*
* @param task task to record action for
* @param taskAction task action to record
* @param <T> task action return type
*/
@Deprecated
<T> void addAuditLog(Task task, TaskAction<T> taskAction);
/**
* Returns all actions taken by a task.
*
* @param taskid task ID
*
* @return list of task actions
*/
@Deprecated
List<TaskAction> getAuditLogs(String taskid);
/**
* Returns a list of currently running or pending tasks as stored in the storage facility. No particular order
* is guaranteed, but implementations are encouraged to return tasks in ascending order of creation.

View File

@ -47,10 +47,10 @@ import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.ImmutableWorkerInfo;
import org.apache.druid.indexing.overlord.IndexerMetadataStorageAdapter;
import org.apache.druid.indexing.overlord.TaskMaster;
import org.apache.druid.indexing.overlord.TaskQueryTool;
import org.apache.druid.indexing.overlord.TaskQueue;
import org.apache.druid.indexing.overlord.TaskRunner;
import org.apache.druid.indexing.overlord.TaskRunnerWorkItem;
import org.apache.druid.indexing.overlord.TaskStorageQueryAdapter;
import org.apache.druid.indexing.overlord.WorkerTaskRunner;
import org.apache.druid.indexing.overlord.WorkerTaskRunnerQueryAdapter;
import org.apache.druid.indexing.overlord.autoscaling.ProvisioningStrategy;
@ -84,7 +84,6 @@ import org.apache.druid.server.security.Resource;
import org.apache.druid.server.security.ResourceAction;
import org.apache.druid.server.security.ResourceType;
import org.apache.druid.tasklogs.TaskLogStreamer;
import org.apache.druid.timeline.DataSegment;
import org.joda.time.Duration;
import org.joda.time.Interval;
@ -126,7 +125,7 @@ public class OverlordResource
private static final Logger log = new Logger(OverlordResource.class);
private final TaskMaster taskMaster;
private final TaskStorageQueryAdapter taskStorageQueryAdapter;
private final TaskQueryTool taskQueryTool;
private final IndexerMetadataStorageAdapter indexerMetadataStorageAdapter;
private final TaskLogStreamer taskLogStreamer;
private final JacksonConfigManager configManager;
@ -163,7 +162,7 @@ public class OverlordResource
@Inject
public OverlordResource(
TaskMaster taskMaster,
TaskStorageQueryAdapter taskStorageQueryAdapter,
TaskQueryTool taskQueryTool,
IndexerMetadataStorageAdapter indexerMetadataStorageAdapter,
TaskLogStreamer taskLogStreamer,
JacksonConfigManager configManager,
@ -175,7 +174,7 @@ public class OverlordResource
)
{
this.taskMaster = taskMaster;
this.taskStorageQueryAdapter = taskStorageQueryAdapter;
this.taskQueryTool = taskQueryTool;
this.indexerMetadataStorageAdapter = indexerMetadataStorageAdapter;
this.taskLogStreamer = taskLogStreamer;
this.configManager = configManager;
@ -285,7 +284,7 @@ public class OverlordResource
}
// Build the response
return Response.ok(taskStorageQueryAdapter.getLockedIntervals(minTaskPriority)).build();
return Response.ok(taskQueryTool.getLockedIntervals(minTaskPriority)).build();
}
@POST
@ -299,7 +298,7 @@ public class OverlordResource
}
// Build the response
return Response.ok(taskStorageQueryAdapter.getLockedIntervals(lockFilterPolicies)).build();
return Response.ok(taskQueryTool.getLockedIntervals(lockFilterPolicies)).build();
}
@GET
@ -310,7 +309,7 @@ public class OverlordResource
{
final TaskPayloadResponse response = new TaskPayloadResponse(
taskid,
taskStorageQueryAdapter.getTask(taskid).orNull()
taskQueryTool.getTask(taskid).orNull()
);
final Response.Status status = response.getPayload() == null
@ -326,7 +325,7 @@ public class OverlordResource
@ResourceFilters(TaskResourceFilter.class)
public Response getTaskStatus(@PathParam("taskid") String taskid)
{
final TaskInfo<Task, TaskStatus> taskInfo = taskStorageQueryAdapter.getTaskInfo(taskid);
final TaskInfo<Task, TaskStatus> taskInfo = taskQueryTool.getTaskInfo(taskid);
TaskStatusResponse response = null;
if (taskInfo != null) {
@ -400,8 +399,12 @@ public class OverlordResource
@ResourceFilters(TaskResourceFilter.class)
public Response getTaskSegments(@PathParam("taskid") String taskid)
{
final Set<DataSegment> segments = taskStorageQueryAdapter.getInsertedSegments(taskid);
return Response.ok().entity(segments).build();
final String errorMsg =
"Segment IDs committed by a task action are not persisted anymore."
+ " Use the metric 'segment/added/bytes' to identify the segments created by a task.";
return Response.status(Status.NOT_FOUND)
.entity(Collections.singletonMap("error", errorMsg))
.build();
}
@POST
@ -437,7 +440,7 @@ public class OverlordResource
@Override
public Response apply(TaskQueue taskQueue)
{
final List<TaskInfo<Task, TaskStatus>> tasks = taskStorageQueryAdapter.getActiveTaskInfo(dataSource);
final List<TaskInfo<Task, TaskStatus>> tasks = taskQueryTool.getActiveTaskInfo(dataSource);
if (tasks.isEmpty()) {
return Response.status(Status.NOT_FOUND).build();
} else {
@ -468,7 +471,7 @@ public class OverlordResource
if (taskQueue.isPresent()) {
optional = taskQueue.get().getTaskStatus(taskId);
} else {
optional = taskStorageQueryAdapter.getStatus(taskId);
optional = taskQueryTool.getStatus(taskId);
}
if (optional.isPresent()) {
result.put(taskId, optional.get());
@ -863,7 +866,7 @@ public class OverlordResource
throw new IAE("Unknown state: [%s]", state);
}
final Stream<TaskStatusPlus> taskStatusPlusStream = taskStorageQueryAdapter.getTaskStatusPlusList(
final Stream<TaskStatusPlus> taskStatusPlusStream = taskQueryTool.getTaskStatusPlusList(
taskLookups,
dataSource
).stream();

View File

@ -26,7 +26,7 @@ import com.google.inject.Inject;
import com.sun.jersey.spi.container.ContainerRequest;
import org.apache.druid.common.utils.IdUtils;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.TaskStorageQueryAdapter;
import org.apache.druid.indexing.overlord.TaskQueryTool;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.server.http.security.AbstractResourceFilter;
import org.apache.druid.server.security.Access;
@ -49,16 +49,16 @@ import javax.ws.rs.core.Response;
*/
public class TaskResourceFilter extends AbstractResourceFilter
{
private final TaskStorageQueryAdapter taskStorageQueryAdapter;
private final TaskQueryTool taskQueryTool;
@Inject
public TaskResourceFilter(
TaskStorageQueryAdapter taskStorageQueryAdapter,
TaskQueryTool taskQueryTool,
AuthorizerMapper authorizerMapper
)
{
super(authorizerMapper);
this.taskStorageQueryAdapter = taskStorageQueryAdapter;
this.taskQueryTool = taskQueryTool;
}
@Override
@ -76,7 +76,7 @@ public class TaskResourceFilter extends AbstractResourceFilter
IdUtils.validateId("taskId", taskId);
Optional<Task> taskOptional = taskStorageQueryAdapter.getTask(taskId);
Optional<Task> taskOptional = taskQueryTool.getTask(taskId);
if (!taskOptional.isPresent()) {
throw new WebApplicationException(
Response.status(Response.Status.NOT_FOUND)

View File

@ -1,154 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.common.actions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.indexing.common.TaskLockType;
import org.apache.druid.indexing.common.task.NoopTask;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.CriticalAction;
import org.apache.druid.indexing.overlord.LockResult;
import org.apache.druid.indexing.overlord.Segments;
import org.apache.druid.indexing.overlord.TimeChunkLockRequest;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.LinearShardSpec;
import org.assertj.core.api.Assertions;
import org.hamcrest.CoreMatchers;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import java.util.Collections;
import java.util.Set;
public class SegmentInsertActionTest
{
@Rule
public ExpectedException thrown = ExpectedException.none();
@Rule
public TaskActionTestKit actionTestKit = new TaskActionTestKit();
private static final String DATA_SOURCE = "none";
private static final Interval INTERVAL = Intervals.of("2020/2020T01");
private static final String PARTY_YEAR = "1999";
private static final String THE_DISTANT_FUTURE = "3000";
private static final DataSegment SEGMENT1 = new DataSegment(
DATA_SOURCE,
INTERVAL,
PARTY_YEAR,
ImmutableMap.of(),
ImmutableList.of(),
ImmutableList.of(),
new LinearShardSpec(0),
9,
1024
);
private static final DataSegment SEGMENT2 = new DataSegment(
DATA_SOURCE,
INTERVAL,
PARTY_YEAR,
ImmutableMap.of(),
ImmutableList.of(),
ImmutableList.of(),
new LinearShardSpec(1),
9,
1024
);
private static final DataSegment SEGMENT3 = new DataSegment(
DATA_SOURCE,
INTERVAL,
THE_DISTANT_FUTURE,
ImmutableMap.of(),
ImmutableList.of(),
ImmutableList.of(),
new LinearShardSpec(1),
9,
1024
);
private LockResult acquireTimeChunkLock(TaskLockType lockType, Task task, Interval interval, long timeoutMs)
throws InterruptedException
{
return actionTestKit.getTaskLockbox().lock(task, new TimeChunkLockRequest(lockType, task, interval, null), timeoutMs);
}
@Test
public void testSimple() throws Exception
{
final Task task = NoopTask.create();
final SegmentInsertAction action = new SegmentInsertAction(ImmutableSet.of(SEGMENT1, SEGMENT2), null);
actionTestKit.getTaskLockbox().add(task);
acquireTimeChunkLock(TaskLockType.EXCLUSIVE, task, INTERVAL, 5000);
actionTestKit.getTaskLockbox().doInCriticalSection(
task,
Collections.singleton(INTERVAL),
CriticalAction.builder()
.onValidLocks(() -> action.perform(task, actionTestKit.getTaskActionToolbox()))
.onInvalidLocks(
() -> {
Assert.fail();
return null;
}
)
.build()
);
Assertions.assertThat(
actionTestKit.getMetadataStorageCoordinator()
.retrieveUsedSegmentsForInterval(DATA_SOURCE, INTERVAL, Segments.ONLY_VISIBLE)
).containsExactlyInAnyOrder(SEGMENT1, SEGMENT2);
}
@Test
public void testFailBadVersion() throws Exception
{
final Task task = NoopTask.create();
final SegmentInsertAction action = new SegmentInsertAction(ImmutableSet.of(SEGMENT3), null);
actionTestKit.getTaskLockbox().add(task);
acquireTimeChunkLock(TaskLockType.EXCLUSIVE, task, INTERVAL, 5000);
thrown.expect(IllegalStateException.class);
thrown.expectMessage(CoreMatchers.containsString("are not covered by locks"));
final Set<DataSegment> segments = actionTestKit.getTaskLockbox().doInCriticalSection(
task,
Collections.singleton(INTERVAL),
CriticalAction.<Set<DataSegment>>builder()
.onValidLocks(() -> action.perform(task, actionTestKit.getTaskActionToolbox()))
.onInvalidLocks(
() -> {
Assert.fail();
return null;
}
)
.build()
);
Assert.assertEquals(ImmutableSet.of(SEGMENT3), segments);
}
}

View File

@ -24,7 +24,6 @@ import org.apache.druid.indexing.common.actions.SurrogateAction;
import org.apache.druid.indexing.common.actions.TaskAction;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.actions.TaskActionToolbox;
import org.apache.druid.indexing.common.actions.TaskAuditLogConfig;
import org.apache.druid.indexing.overlord.TaskStorage;
import java.util.concurrent.ConcurrentHashMap;
@ -42,7 +41,7 @@ public class CountingLocalTaskActionClientForTest implements TaskActionClient
TaskActionToolbox toolbox
)
{
delegate = new LocalTaskActionClient(task, storage, toolbox, new TaskAuditLogConfig(false));
delegate = new LocalTaskActionClient(task, toolbox);
}
@Override

View File

@ -43,7 +43,6 @@ import org.apache.druid.indexer.report.TaskReport;
import org.apache.druid.indexing.common.SegmentCacheManagerFactory;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.TestUtils;
import org.apache.druid.indexing.common.actions.SegmentInsertAction;
import org.apache.druid.indexing.common.actions.SegmentTransactionalInsertAction;
import org.apache.druid.indexing.common.actions.TaskAction;
import org.apache.druid.indexing.common.actions.TaskActionClient;
@ -353,7 +352,8 @@ public abstract class IngestionTestBase extends InitializedNullHandlingTest
public class TestLocalTaskActionClient extends CountingLocalTaskActionClientForTest
{
private final Set<DataSegment> publishedSegments = new HashSet<>();
private SegmentSchemaMapping segmentSchemaMapping = new SegmentSchemaMapping(CentralizedDatasourceSchemaConfig.SCHEMA_VERSION);
private final SegmentSchemaMapping segmentSchemaMapping
= new SegmentSchemaMapping(CentralizedDatasourceSchemaConfig.SCHEMA_VERSION);
private TestLocalTaskActionClient(Task task)
{
@ -365,11 +365,9 @@ public abstract class IngestionTestBase extends InitializedNullHandlingTest
{
final RetType result = super.submit(taskAction);
if (taskAction instanceof SegmentTransactionalInsertAction) {
publishedSegments.addAll(((SegmentTransactionalInsertAction) taskAction).getSegments());
segmentSchemaMapping.merge(((SegmentTransactionalInsertAction) taskAction).getSegmentSchemaMapping());
} else if (taskAction instanceof SegmentInsertAction) {
publishedSegments.addAll(((SegmentInsertAction) taskAction).getSegments());
segmentSchemaMapping.merge(((SegmentInsertAction) taskAction).getSegmentSchemaMapping());
SegmentTransactionalInsertAction insertAction = (SegmentTransactionalInsertAction) taskAction;
publishedSegments.addAll(insertAction.getSegments());
segmentSchemaMapping.merge(insertAction.getSegmentSchemaMapping());
}
return result;
}

View File

@ -28,6 +28,7 @@ import org.apache.druid.indexing.common.task.NoopTask;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.metadata.TaskLookup;
import org.easymock.EasyMock;
import org.hamcrest.MatcherAssert;
import org.joda.time.Interval;
@ -39,7 +40,7 @@ import java.util.List;
public class IndexerMetadataStorageAdapterTest
{
private TaskStorageQueryAdapter taskStorageQueryAdapter;
private TaskStorage taskStorage;
private IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator;
private IndexerMetadataStorageAdapter indexerMetadataStorageAdapter;
@ -47,9 +48,9 @@ public class IndexerMetadataStorageAdapterTest
public void setup()
{
indexerMetadataStorageCoordinator = EasyMock.strictMock(IndexerMetadataStorageCoordinator.class);
taskStorageQueryAdapter = EasyMock.strictMock(TaskStorageQueryAdapter.class);
taskStorage = EasyMock.strictMock(TaskStorage.class);
indexerMetadataStorageAdapter = new IndexerMetadataStorageAdapter(
taskStorageQueryAdapter,
taskStorage,
indexerMetadataStorageCoordinator
);
}
@ -73,7 +74,7 @@ public class IndexerMetadataStorageAdapterTest
NoopTask.create()
)
);
EasyMock.expect(taskStorageQueryAdapter.getActiveTaskInfo("dataSource")).andReturn(taskInfos);
EasyMock.expect(taskStorage.getTaskInfos(TaskLookup.activeTasksOnly(), "dataSource")).andReturn(taskInfos);
final Interval deleteInterval = Intervals.of("2017-01-01/2017-12-01");
EasyMock
@ -84,7 +85,7 @@ public class IndexerMetadataStorageAdapterTest
)
)
.andReturn(10);
EasyMock.replay(taskStorageQueryAdapter, indexerMetadataStorageCoordinator);
EasyMock.replay(taskStorage, indexerMetadataStorageCoordinator);
Assert.assertEquals(10, indexerMetadataStorageAdapter.deletePendingSegments("dataSource", deleteInterval));
}
@ -109,7 +110,8 @@ public class IndexerMetadataStorageAdapterTest
)
);
EasyMock.expect(taskStorageQueryAdapter.getActiveTaskInfo("dataSource")).andReturn(taskInfos);
EasyMock.expect(taskStorage.getTaskInfos(TaskLookup.activeTasksOnly(), "dataSource"))
.andReturn(taskInfos);
final Interval deleteInterval = Intervals.of("2017-01-01/2017-12-01");
EasyMock
@ -120,7 +122,7 @@ public class IndexerMetadataStorageAdapterTest
)
)
.andReturn(10);
EasyMock.replay(taskStorageQueryAdapter, indexerMetadataStorageCoordinator);
EasyMock.replay(taskStorage, indexerMetadataStorageCoordinator);
MatcherAssert.assertThat(
Assert.assertThrows(
@ -155,7 +157,8 @@ public class IndexerMetadataStorageAdapterTest
)
);
EasyMock.expect(taskStorageQueryAdapter.getActiveTaskInfo("dataSource")).andReturn(taskInfos);
EasyMock.expect(taskStorage.getTaskInfos(TaskLookup.activeTasksOnly(), "dataSource"))
.andReturn(taskInfos);
final Interval deleteInterval = Intervals.of("2017-01-01/2018-12-01");
EasyMock
@ -166,7 +169,7 @@ public class IndexerMetadataStorageAdapterTest
)
)
.andReturn(10);
EasyMock.replay(taskStorageQueryAdapter, indexerMetadataStorageCoordinator);
EasyMock.replay(taskStorage, indexerMetadataStorageCoordinator);
MatcherAssert.assertThat(
Assert.assertThrows(

View File

@ -20,14 +20,13 @@
package org.apache.druid.indexing.overlord;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.TaskLock;
import org.apache.druid.indexing.common.TaskLockType;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.actions.LockListAction;
import org.apache.druid.indexing.common.actions.LockReleaseAction;
import org.apache.druid.indexing.common.actions.SegmentInsertAction;
import org.apache.druid.indexing.common.actions.SegmentTransactionalInsertAction;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.actions.TimeChunkLockAcquireAction;
import org.apache.druid.indexing.common.config.TaskConfig;
@ -38,6 +37,7 @@ import org.apache.druid.timeline.DataSegment;
import org.joda.time.Interval;
import org.junit.Assert;
import java.util.Collections;
import java.util.List;
/**
@ -97,18 +97,7 @@ public class RealtimeishTask extends AbstractTask
Assert.assertEquals("locks2", ImmutableList.of(lock1, lock2), locks2);
// Push first segment
SegmentInsertAction firstSegmentInsertAction = new SegmentInsertAction(
ImmutableSet.of(
DataSegment.builder()
.dataSource("foo")
.interval(interval1)
.version(lock1.getVersion())
.size(0)
.build()
),
null
);
toolbox.getTaskActionClient().submit(firstSegmentInsertAction);
toolbox.getTaskActionClient().submit(createSegmentInsertAction(interval1, lock1.getVersion()));
// Release first lock
toolbox.getTaskActionClient().submit(new LockReleaseAction(interval1));
@ -118,18 +107,7 @@ public class RealtimeishTask extends AbstractTask
Assert.assertEquals("locks3", ImmutableList.of(lock2), locks3);
// Push second segment
SegmentInsertAction secondSegmentInsertAction = new SegmentInsertAction(
ImmutableSet.of(
DataSegment.builder()
.dataSource("foo")
.interval(interval2)
.version(lock2.getVersion())
.size(0)
.build()
),
null
);
toolbox.getTaskActionClient().submit(secondSegmentInsertAction);
toolbox.getTaskActionClient().submit(createSegmentInsertAction(interval2, lock2.getVersion()));
// Release second lock
toolbox.getTaskActionClient().submit(new LockReleaseAction(interval2));
@ -141,4 +119,17 @@ public class RealtimeishTask extends AbstractTask
// Exit
return TaskStatus.success(getId());
}
private SegmentTransactionalInsertAction createSegmentInsertAction(Interval interval, String version)
{
final DataSegment segmentToInsert
= DataSegment.builder()
.dataSource("foo")
.interval(interval)
.version(version)
.size(0)
.build();
return SegmentTransactionalInsertAction
.appendAction(Collections.singleton(segmentToInsert), null, null, null);
}
}

View File

@ -63,11 +63,10 @@ import org.apache.druid.indexing.common.TaskToolboxFactory;
import org.apache.druid.indexing.common.TestUtils;
import org.apache.druid.indexing.common.actions.LocalTaskActionClientFactory;
import org.apache.druid.indexing.common.actions.LockListAction;
import org.apache.druid.indexing.common.actions.SegmentInsertAction;
import org.apache.druid.indexing.common.actions.SegmentTransactionalInsertAction;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.actions.TaskActionClientFactory;
import org.apache.druid.indexing.common.actions.TaskActionToolbox;
import org.apache.druid.indexing.common.actions.TaskAuditLogConfig;
import org.apache.druid.indexing.common.actions.TimeChunkLockTryAcquireAction;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.config.TaskConfigBuilder;
@ -235,7 +234,7 @@ public class TaskLifecycleTest extends InitializedNullHandlingTest
private final String taskStorageType;
private ObjectMapper mapper;
private TaskStorageQueryAdapter tsqa = null;
private TaskQueryTool tsqa = null;
private TaskStorage taskStorage = null;
private TaskLockbox taskLockbox = null;
private TaskQueue taskQueue = null;
@ -478,7 +477,7 @@ public class TaskLifecycleTest extends InitializedNullHandlingTest
TaskMaster taskMaster = EasyMock.createMock(TaskMaster.class);
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.absent()).anyTimes();
EasyMock.replay(taskMaster);
tsqa = new TaskStorageQueryAdapter(taskStorage, taskLockbox, taskMaster);
tsqa = new TaskQueryTool(taskStorage, taskLockbox, taskMaster);
return taskStorage;
}
@ -592,7 +591,6 @@ public class TaskLifecycleTest extends InitializedNullHandlingTest
taskLockbox = new TaskLockbox(taskStorage, mdc);
tac = new LocalTaskActionClientFactory(
taskStorage,
new TaskActionToolbox(
taskLockbox,
taskStorage,
@ -600,8 +598,7 @@ public class TaskLifecycleTest extends InitializedNullHandlingTest
emitter,
EasyMock.createMock(SupervisorManager.class),
mapper
),
new TaskAuditLogConfig(true)
)
);
taskConfig = new TaskConfigBuilder()
.setBaseDir(temporaryFolder.newFolder().toString())
@ -747,12 +744,10 @@ public class TaskLifecycleTest extends InitializedNullHandlingTest
final TaskStatus mergedStatus = runTask(indexTask);
final TaskStatus status = taskStorage.getStatus(indexTask.getId()).get();
final List<DataSegment> publishedSegments = BY_INTERVAL_ORDERING.sortedCopy(mdc.getPublished());
final List<DataSegment> loggedSegments = BY_INTERVAL_ORDERING.sortedCopy(tsqa.getInsertedSegments(indexTask.getId()));
Assert.assertEquals("statusCode", TaskState.SUCCESS, status.getStatusCode());
Assert.assertEquals(taskLocation, status.getLocation());
Assert.assertEquals("merged statusCode", TaskState.SUCCESS, mergedStatus.getStatusCode());
Assert.assertEquals("segments logged vs published", loggedSegments, publishedSegments);
Assert.assertEquals("num segments published", 2, mdc.getPublished().size());
Assert.assertEquals("num segments nuked", 0, mdc.getNuked().size());
@ -1103,7 +1098,9 @@ public class TaskLifecycleTest extends InitializedNullHandlingTest
.size(0)
.build();
toolbox.getTaskActionClient().submit(new SegmentInsertAction(ImmutableSet.of(segment), null));
toolbox.getTaskActionClient().submit(
SegmentTransactionalInsertAction.appendAction(ImmutableSet.of(segment), null, null, null)
);
return TaskStatus.success(getId());
}
};
@ -1144,7 +1141,9 @@ public class TaskLifecycleTest extends InitializedNullHandlingTest
.size(0)
.build();
toolbox.getTaskActionClient().submit(new SegmentInsertAction(ImmutableSet.of(segment), null));
toolbox.getTaskActionClient().submit(
SegmentTransactionalInsertAction.appendAction(ImmutableSet.of(segment), null, null, null)
);
return TaskStatus.success(getId());
}
};
@ -1186,7 +1185,9 @@ public class TaskLifecycleTest extends InitializedNullHandlingTest
.size(0)
.build();
toolbox.getTaskActionClient().submit(new SegmentInsertAction(ImmutableSet.of(segment), null));
toolbox.getTaskActionClient().submit(
SegmentTransactionalInsertAction.appendAction(ImmutableSet.of(segment), null, null, null)
);
return TaskStatus.success(getId());
}
};
@ -1244,11 +1245,9 @@ public class TaskLifecycleTest extends InitializedNullHandlingTest
final TaskStatus status = taskStorage.getStatus(indexTask.getId()).get();
final List<DataSegment> publishedSegments = BY_INTERVAL_ORDERING.sortedCopy(mdc.getPublished());
final List<DataSegment> loggedSegments = BY_INTERVAL_ORDERING.sortedCopy(tsqa.getInsertedSegments(indexTask.getId()));
Assert.assertEquals("statusCode", TaskState.SUCCESS, status.getStatusCode());
Assert.assertEquals(taskLocation, status.getLocation());
Assert.assertEquals("segments logged vs published", loggedSegments, publishedSegments);
Assert.assertEquals("num segments published", 2, mdc.getPublished().size());
Assert.assertEquals("num segments nuked", 0, mdc.getNuked().size());

View File

@ -52,12 +52,12 @@ import org.apache.druid.indexing.overlord.IndexerMetadataStorageAdapter;
import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
import org.apache.druid.indexing.overlord.TaskLockbox;
import org.apache.druid.indexing.overlord.TaskMaster;
import org.apache.druid.indexing.overlord.TaskQueryTool;
import org.apache.druid.indexing.overlord.TaskRunner;
import org.apache.druid.indexing.overlord.TaskRunnerFactory;
import org.apache.druid.indexing.overlord.TaskRunnerListener;
import org.apache.druid.indexing.overlord.TaskRunnerWorkItem;
import org.apache.druid.indexing.overlord.TaskStorage;
import org.apache.druid.indexing.overlord.TaskStorageQueryAdapter;
import org.apache.druid.indexing.overlord.WorkerTaskRunnerQueryAdapter;
import org.apache.druid.indexing.overlord.autoscaling.ScalingStats;
import org.apache.druid.indexing.overlord.config.DefaultTaskConfig;
@ -269,14 +269,14 @@ public class OverlordTest
Assert.assertEquals(taskMaster.getCurrentLeader(), druidNode.getHostAndPort());
Assert.assertEquals(Optional.absent(), taskMaster.getRedirectLocation());
final TaskStorageQueryAdapter taskStorageQueryAdapter = new TaskStorageQueryAdapter(taskStorage, taskLockbox, taskMaster);
final TaskQueryTool taskQueryTool = new TaskQueryTool(taskStorage, taskLockbox, taskMaster);
final WorkerTaskRunnerQueryAdapter workerTaskRunnerQueryAdapter = new WorkerTaskRunnerQueryAdapter(taskMaster, null);
// Test Overlord resource stuff
AuditManager auditManager = EasyMock.createNiceMock(AuditManager.class);
overlordResource = new OverlordResource(
taskMaster,
taskStorageQueryAdapter,
new IndexerMetadataStorageAdapter(taskStorageQueryAdapter, null),
taskQueryTool,
new IndexerMetadataStorageAdapter(taskStorage, null),
null,
null,
auditManager,

View File

@ -26,7 +26,7 @@ import com.google.inject.Injector;
import com.sun.jersey.spi.container.ResourceFilter;
import org.apache.druid.indexing.common.task.NoopTask;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.TaskStorageQueryAdapter;
import org.apache.druid.indexing.overlord.TaskQueryTool;
import org.apache.druid.indexing.overlord.http.OverlordResource;
import org.apache.druid.indexing.overlord.supervisor.Supervisor;
import org.apache.druid.indexing.overlord.supervisor.SupervisorManager;
@ -60,7 +60,7 @@ public class OverlordSecurityResourceFilterTest extends ResourceFilterTestHelper
return ImmutableList.copyOf(
Iterables.concat(
getRequestPaths(OverlordResource.class, ImmutableList.of(
TaskStorageQueryAdapter.class,
TaskQueryTool.class,
AuthorizerMapper.class
)
),
@ -84,7 +84,7 @@ public class OverlordSecurityResourceFilterTest extends ResourceFilterTestHelper
private static boolean mockedOnceTsqa;
private static boolean mockedOnceSM;
private TaskStorageQueryAdapter tsqa;
private TaskQueryTool tsqa;
private SupervisorManager supervisorManager;
public OverlordSecurityResourceFilterTest(
@ -107,7 +107,7 @@ public class OverlordSecurityResourceFilterTest extends ResourceFilterTestHelper
// Since we are creating the mocked tsqa object only once and getting that object from Guice here therefore
// if the mockedOnce check is not done then we will call EasyMock.expect and EasyMock.replay on the mocked object
// multiple times and it will throw exceptions
tsqa = injector.getInstance(TaskStorageQueryAdapter.class);
tsqa = injector.getInstance(TaskQueryTool.class);
EasyMock.expect(tsqa.getTask(EasyMock.anyString())).andReturn(Optional.of(noopTask)).anyTimes();
EasyMock.replay(tsqa);
mockedOnceTsqa = true;

View File

@ -21,7 +21,7 @@ package org.apache.druid.indexing.overlord.http.security;
import com.google.common.base.Optional;
import com.sun.jersey.spi.container.ContainerRequest;
import org.apache.druid.indexing.overlord.TaskStorageQueryAdapter;
import org.apache.druid.indexing.overlord.TaskQueryTool;
import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec;
import org.apache.druid.server.security.AuthorizerMapper;
import org.easymock.EasyMock;
@ -42,7 +42,7 @@ import static org.easymock.EasyMock.expect;
public class TaskResourceFilterTest
{
private AuthorizerMapper authorizerMapper;
private TaskStorageQueryAdapter taskStorageQueryAdapter;
private TaskQueryTool taskQueryTool;
private ContainerRequest containerRequest;
private TaskResourceFilter resourceFilter;
@ -50,9 +50,9 @@ public class TaskResourceFilterTest
public void setup()
{
authorizerMapper = EasyMock.createMock(AuthorizerMapper.class);
taskStorageQueryAdapter = EasyMock.createMock(TaskStorageQueryAdapter.class);
taskQueryTool = EasyMock.createMock(TaskQueryTool.class);
containerRequest = EasyMock.createMock(ContainerRequest.class);
resourceFilter = new TaskResourceFilter(taskStorageQueryAdapter, authorizerMapper);
resourceFilter = new TaskResourceFilter(taskQueryTool, authorizerMapper);
}
@Test
@ -68,11 +68,11 @@ public class TaskResourceFilterTest
expect(supervisorSpec.getDataSources())
.andReturn(Collections.singletonList(taskId))
.anyTimes();
expect(taskStorageQueryAdapter.getTask(taskId))
expect(taskQueryTool.getTask(taskId))
.andReturn(Optional.absent())
.atLeastOnce();
EasyMock.replay(containerRequest);
EasyMock.replay(taskStorageQueryAdapter);
EasyMock.replay(taskQueryTool);
WebApplicationException expected = null;
try {
@ -84,7 +84,7 @@ public class TaskResourceFilterTest
Assert.assertNotNull(expected);
Assert.assertEquals(expected.getResponse().getStatus(), Response.Status.NOT_FOUND.getStatusCode());
EasyMock.verify(containerRequest);
EasyMock.verify(taskStorageQueryAdapter);
EasyMock.verify(taskQueryTool);
}
private List<PathSegment> getPathSegments(String path)

View File

@ -62,7 +62,6 @@ import org.apache.druid.indexing.common.TestUtils;
import org.apache.druid.indexing.common.actions.LocalTaskActionClientFactory;
import org.apache.druid.indexing.common.actions.TaskActionClientFactory;
import org.apache.druid.indexing.common.actions.TaskActionToolbox;
import org.apache.druid.indexing.common.actions.TaskAuditLogConfig;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.config.TaskConfigBuilder;
import org.apache.druid.indexing.common.config.TaskStorageConfig;
@ -631,9 +630,7 @@ public abstract class SeekableStreamIndexTaskTestBase extends EasyMockSupport
objectMapper
);
final TaskActionClientFactory taskActionClientFactory = new LocalTaskActionClientFactory(
taskStorage,
taskActionToolbox,
new TaskAuditLogConfig(false)
taskActionToolbox
);
final SegmentHandoffNotifierFactory handoffNotifierFactory = dataSource -> new SegmentHandoffNotifier()
{

View File

@ -20,6 +20,8 @@
package org.apache.druid.metadata;
import com.google.common.base.Optional;
import org.apache.druid.error.DruidException;
import org.apache.druid.guice.annotations.ExtensionPoint;
import org.apache.druid.indexer.TaskIdentifier;
import org.apache.druid.indexer.TaskInfo;
import org.apache.druid.metadata.TaskLookup.TaskLookupType;
@ -31,6 +33,7 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
@ExtensionPoint
public interface MetadataStorageActionHandler<EntryType, StatusType, LogType, LockType>
{
/**
@ -161,21 +164,34 @@ public interface MetadataStorageActionHandler<EntryType, StatusType, LogType, Lo
void removeTasksOlderThan(long timestamp);
/**
* Add a log to the entry with the given id.
* Task logs are not used anymore and this method is never called by Druid code.
* It has been retained only for backwards compatibility with older extensions.
* New extensions must not implement this method.
*
* @param entryId entry id
* @param log log to add
* @return true if the log was added
* @throws DruidException of category UNSUPPORTED whenever called.
*/
boolean addLog(String entryId, LogType log);
@Deprecated
default boolean addLog(String entryId, LogType log)
{
throw DruidException.defensive()
.ofCategory(DruidException.Category.UNSUPPORTED)
.build("Task actions are not logged anymore.");
}
/**
* Returns the logs for the entry with the given id.
* Task logs are not used anymore and this method is never called by Druid code.
* It has been retained only for backwards compatibility with older extensions.
* New extensions must not implement this method.
*
* @param entryId entry id
* @return list of logs
* @throws DruidException of category UNSUPPORTED whenever called.
*/
List<LogType> getLogs(String entryId);
@Deprecated
default List<LogType> getLogs(String entryId)
{
throw DruidException.defensive()
.ofCategory(DruidException.Category.UNSUPPORTED)
.build("Task actions are not logged anymore.");
}
/**
* Returns the locks for the given entry
@ -188,7 +204,7 @@ public interface MetadataStorageActionHandler<EntryType, StatusType, LogType, Lo
/**
* Returns the lock id for the given entry and the lock.
*
* @return lock id if found. Otherwise null.
* @return lock id if found, otherwise null.
*/
@Nullable
Long getLockId(String entryId, LockType lock);

View File

@ -25,6 +25,5 @@ public interface MetadataStorageActionHandlerTypes<EntryType, StatusType, LogTyp
{
TypeReference<EntryType> getEntryType();
TypeReference<StatusType> getStatusType();
TypeReference<LogType> getLogType();
TypeReference<LockType> getLockType();
}

View File

@ -43,6 +43,11 @@ public interface TaskLookup
COMPLETE
}
static TaskLookup activeTasksOnly()
{
return ActiveTaskLookup.getInstance();
}
/**
* Whether this lookup is guaranteed to not return any tasks.
*/

View File

@ -126,7 +126,7 @@ public interface Operator
*/
STOP,
/**
* Inidcates that the downstream processing should pause its pushing of results and instead return a
* Indicates that the downstream processing should pause its pushing of results and instead return a
* continuation object that encapsulates whatever state is required to resume processing. When this signal is
* received, Operators that are generating data might choose to exert backpressure or otherwise pause their
* processing efforts until called again with the returned continuation object.

View File

@ -23,7 +23,9 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.query.rowsandcols.RowsAndColumns;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
public class ComposingProcessor implements Processor
{
@ -37,6 +39,16 @@ public class ComposingProcessor implements Processor
this.processors = processors;
}
@Override
public List<String> getOutputColumnNames()
{
List<String> outputColumnNames = new ArrayList<>();
for (Processor processor : processors) {
outputColumnNames.addAll(processor.getOutputColumnNames());
}
return outputColumnNames;
}
@JsonProperty("processors")
public Processor[] getProcessors()
{

View File

@ -31,6 +31,8 @@ import org.apache.druid.query.operator.window.value.WindowLastProcessor;
import org.apache.druid.query.operator.window.value.WindowOffsetProcessor;
import org.apache.druid.query.rowsandcols.RowsAndColumns;
import java.util.List;
/**
* A Processor is a bit of logic that processes a single RowsAndColumns object to produce a new RowsAndColumns
* object. Generally speaking, it is used to add or alter columns in a batch-oriented fashion.
@ -80,4 +82,9 @@ public interface Processor
* @return boolean identifying if these processors should be considered equivalent to each other.
*/
boolean validateEquivalent(Processor otherProcessor);
/**
* @return List of output column names for the Processor.
*/
List<String> getOutputColumnNames();
}

View File

@ -27,7 +27,9 @@ import org.apache.druid.query.rowsandcols.semantic.DefaultFramedOnHeapAggregatab
import org.apache.druid.query.rowsandcols.semantic.FramedOnHeapAggregatable;
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
public class WindowFramedAggregateProcessor implements Processor
@ -45,6 +47,16 @@ public class WindowFramedAggregateProcessor implements Processor
private final WindowFrame frame;
private final AggregatorFactory[] aggregations;
@Override
public List<String> getOutputColumnNames()
{
List<String> outputColumnNames = new ArrayList<>();
for (AggregatorFactory aggregation : aggregations) {
outputColumnNames.add(aggregation.getName());
}
return outputColumnNames;
}
@JsonCreator
public WindowFramedAggregateProcessor(
@JsonProperty("frame") WindowFrame frame,

View File

@ -28,12 +28,20 @@ import org.apache.druid.query.rowsandcols.column.IntArrayColumn;
import org.apache.druid.query.rowsandcols.semantic.AppendableRowsAndColumns;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
public class WindowPercentileProcessor implements Processor
{
private final int numBuckets;
private final String outputColumn;
@Override
public List<String> getOutputColumnNames()
{
return Collections.singletonList(outputColumn);
}
@JsonCreator
public WindowPercentileProcessor(
@JsonProperty("outputColumn") String outputColumn,

View File

@ -27,6 +27,7 @@ import org.apache.druid.query.rowsandcols.semantic.AppendableRowsAndColumns;
import org.apache.druid.query.rowsandcols.semantic.ClusteredGroupPartitioner;
import org.apache.druid.query.rowsandcols.semantic.DefaultClusteredGroupPartitioner;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.function.Function;
@ -124,4 +125,9 @@ public abstract class WindowRankingProcessorBase implements Processor
return Objects.equals(groupingCols, other.groupingCols) && Objects.equals(outputColumn, other.outputColumn);
}
@Override
public List<String> getOutputColumnNames()
{
return Collections.singletonList(outputColumn);
}
}

View File

@ -28,6 +28,9 @@ import org.apache.druid.query.rowsandcols.column.ColumnAccessorBasedColumn;
import org.apache.druid.query.rowsandcols.semantic.AppendableRowsAndColumns;
import org.apache.druid.segment.column.ColumnType;
import java.util.Collections;
import java.util.List;
public class WindowRowNumberProcessor implements Processor
{
private final String outputColumn;
@ -128,4 +131,10 @@ public class WindowRowNumberProcessor implements Processor
"outputColumn='" + outputColumn + '\'' +
'}';
}
@Override
public List<String> getOutputColumnNames()
{
return Collections.singletonList(outputColumn);
}
}

View File

@ -26,6 +26,8 @@ import org.apache.druid.query.rowsandcols.RowsAndColumns;
import org.apache.druid.query.rowsandcols.column.Column;
import org.apache.druid.query.rowsandcols.semantic.AppendableRowsAndColumns;
import java.util.Collections;
import java.util.List;
import java.util.function.Function;
public abstract class WindowValueProcessorBase implements Processor
@ -100,4 +102,10 @@ public abstract class WindowValueProcessorBase implements Processor
return "inputColumn=" + inputColumn +
", outputColumn='" + outputColumn + '\'';
}
@Override
public List<String> getOutputColumnNames()
{
return Collections.singletonList(outputColumn);
}
}

View File

@ -41,7 +41,6 @@ import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
@ -209,7 +208,8 @@ public class DefaultColumnSelectorFactoryMaker implements ColumnSelectorFactoryM
myClazz = float.class;
break;
case ARRAY:
myClazz = List.class;
myClazz = Object[].class;
break;
default:
throw DruidException.defensive("this class cannot handle type [%s]", columnAccessor.getType());
}

View File

@ -475,15 +475,15 @@ public class ExpressionSelectors
}
final Class<?> clazz = selector.classOfObject();
if (Number.class.isAssignableFrom(clazz) || String.class.isAssignableFrom(clazz)) {
// Number, String supported as-is.
if (Number.class.isAssignableFrom(clazz) || String.class.isAssignableFrom(clazz) || Object[].class.isAssignableFrom(clazz)) {
// Number, String, Arrays supported as-is.
return selector::getObject;
} else if (clazz.isAssignableFrom(Number.class) || clazz.isAssignableFrom(String.class)) {
// Might be Numbers and Strings. Use a selector that double-checks.
return () -> {
final Object val = selector.getObject();
if (val instanceof List) {
NonnullPair<ExpressionType, Object[]> coerced = ExprEval.coerceListToArray((List) val, homogenizeMultiValue);
NonnullPair<ExpressionType, Object[]> coerced = ExprEval.coerceListToArray((List<?>) val, homogenizeMultiValue);
if (coerced == null) {
return null;
}
@ -496,7 +496,7 @@ public class ExpressionSelectors
return () -> {
final Object val = selector.getObject();
if (val != null) {
NonnullPair<ExpressionType, Object[]> coerced = ExprEval.coerceListToArray((List) val, homogenizeMultiValue);
NonnullPair<ExpressionType, Object[]> coerced = ExprEval.coerceListToArray((List<?>) val, homogenizeMultiValue);
if (coerced == null) {
return null;
}

View File

@ -0,0 +1,178 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.metadata;
import com.google.common.base.Optional;
import org.apache.druid.error.DruidException;
import org.apache.druid.indexer.TaskIdentifier;
import org.apache.druid.indexer.TaskInfo;
import org.joda.time.DateTime;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import javax.annotation.Nullable;
import java.util.Collections;
import java.util.List;
import java.util.Map;
/**
* Tests the default methods of the interface {@link MetadataStorageActionHandler}.
* Required only for coverage as these methods are already being tested in
* {@code SQLMetadataStorageActionHandlerTest}.
*/
public class MetadataStorageActionHandlerTest
{
private MetadataStorageActionHandler<String, String, String, String> handler;
@Before
public void setup()
{
this.handler = new MetadataStorageActionHandler<String, String, String, String>()
{
@Override
public void insert(
String id,
DateTime timestamp,
String dataSource,
String entry,
boolean active,
@Nullable String status,
String type,
String groupId
)
{
}
@Override
public boolean setStatus(String entryId, boolean active, String status)
{
return false;
}
@Override
public Optional<String> getEntry(String entryId)
{
return null;
}
@Override
public Optional<String> getStatus(String entryId)
{
return null;
}
@Nullable
@Override
public TaskInfo<String, String> getTaskInfo(String entryId)
{
return null;
}
@Override
public List<TaskInfo<String, String>> getTaskInfos(
Map<TaskLookup.TaskLookupType, TaskLookup> taskLookups,
@Nullable String datasource
)
{
return Collections.emptyList();
}
@Override
public List<TaskInfo<TaskIdentifier, String>> getTaskStatusList(
Map<TaskLookup.TaskLookupType, TaskLookup> taskLookups,
@Nullable String datasource
)
{
return Collections.emptyList();
}
@Override
public boolean addLock(String entryId, String lock)
{
return false;
}
@Override
public boolean replaceLock(String entryId, long oldLockId, String newLock)
{
return false;
}
@Override
public void removeLock(long lockId)
{
}
@Override
public void removeTasksOlderThan(long timestamp)
{
}
@Override
public Map<Long, String> getLocks(String entryId)
{
return Collections.emptyMap();
}
@Override
public Long getLockId(String entryId, String lock)
{
return 0L;
}
@Override
public void populateTaskTypeAndGroupIdAsync()
{
}
};
}
@Test
public void testAddLogThrowsUnsupportedException()
{
Exception exception = Assert.assertThrows(
DruidException.class,
() -> handler.addLog("abcd", "logentry")
);
Assert.assertEquals(
"Task actions are not logged anymore.",
exception.getMessage()
);
}
@Test
public void testGetLogsThrowsUnsupportedException()
{
Exception exception = Assert.assertThrows(
DruidException.class,
() -> handler.getLogs("abcd")
);
Assert.assertEquals(
"Task actions are not logged anymore.",
exception.getMessage()
);
}
}

View File

@ -130,7 +130,7 @@ public class TaskLookupTest
@Test
public void testGetType()
{
Assert.assertEquals(TaskLookupType.ACTIVE, ActiveTaskLookup.getInstance().getType());
Assert.assertEquals(TaskLookupType.ACTIVE, TaskLookup.activeTasksOnly().getType());
}
}
}

View File

@ -27,6 +27,9 @@ import org.apache.druid.query.rowsandcols.column.IntArrayColumn;
import org.junit.Assert;
import org.junit.Test;
import java.util.Collections;
import java.util.List;
public class WindowProcessorOperatorTest
{
@Test
@ -53,6 +56,12 @@ public class WindowProcessorOperatorTest
{
return true;
}
@Override
public List<String> getOutputColumnNames()
{
return Collections.emptyList();
}
},
InlineScanOperator.make(rac)
);

View File

@ -23,6 +23,9 @@ import org.apache.druid.query.rowsandcols.RowsAndColumns;
import org.junit.Assert;
import org.junit.Test;
import java.util.Collections;
import java.util.List;
public class ComposingProcessorTest
{
@Test
@ -32,6 +35,7 @@ public class ComposingProcessorTest
final ProcessorForTesting secondProcessor = new ProcessorForTesting();
ComposingProcessor proc = new ComposingProcessor(firstProcessor, secondProcessor);
Assert.assertTrue(proc.getOutputColumnNames().isEmpty());
proc.process(null);
Assert.assertEquals(1, firstProcessor.processCounter);
@ -70,5 +74,11 @@ public class ComposingProcessorTest
++validateCounter;
return validationResult;
}
@Override
public List<String> getOutputColumnNames()
{
return Collections.emptyList();
}
}
}

View File

@ -19,6 +19,7 @@
package org.apache.druid.query.operator.window;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import nl.jqno.equalsverifier.EqualsVerifier;
import org.apache.druid.common.config.NullHandling;
@ -51,6 +52,7 @@ public class WindowFramedAggregateProcessorTest
new DoubleSumAggregatorFactory("cummSum", "doubleCol")
};
WindowFramedAggregateProcessor proc = new WindowFramedAggregateProcessor(theFrame, theAggs);
Assert.assertEquals(ImmutableList.of("cummMax", "cummSum"), proc.getOutputColumnNames());
final MapOfColumnsRowsAndColumns rac = MapOfColumnsRowsAndColumns.fromMap(ImmutableMap.of(
"yay", new IntArrayColumn(new int[]{1, 2, 3})

View File

@ -25,6 +25,7 @@ import org.apache.druid.query.rowsandcols.MapOfColumnsRowsAndColumns;
import org.apache.druid.query.rowsandcols.RowsAndColumns;
import org.apache.druid.query.rowsandcols.column.Column;
import org.apache.druid.query.rowsandcols.column.IntArrayColumn;
import org.junit.Assert;
import org.junit.Test;
import java.util.Collections;
@ -42,6 +43,7 @@ public class WindowCumeDistProcessorTest
MapOfColumnsRowsAndColumns rac = MapOfColumnsRowsAndColumns.fromMap(map);
Processor processor = new WindowCumeDistProcessor(Collections.singletonList("vals"), "CumeDist");
Assert.assertEquals(Collections.singletonList("CumeDist"), processor.getOutputColumnNames());
final RowsAndColumnsHelper expectations = new RowsAndColumnsHelper()
.expectColumn("vals", new int[]{7, 18, 18, 30, 120, 121, 122, 122, 8290, 8290})

View File

@ -25,6 +25,7 @@ import org.apache.druid.query.rowsandcols.MapOfColumnsRowsAndColumns;
import org.apache.druid.query.rowsandcols.RowsAndColumns;
import org.apache.druid.query.rowsandcols.column.Column;
import org.apache.druid.query.rowsandcols.column.IntArrayColumn;
import org.junit.Assert;
import org.junit.Test;
import java.util.Collections;
@ -42,6 +43,7 @@ public class WindowDenseRankProcessorTest
MapOfColumnsRowsAndColumns rac = MapOfColumnsRowsAndColumns.fromMap(map);
Processor processor = new WindowDenseRankProcessor(Collections.singletonList("vals"), "DenseRank");
Assert.assertEquals(Collections.singletonList("DenseRank"), processor.getOutputColumnNames());
final RowsAndColumnsHelper expectations = new RowsAndColumnsHelper()
.expectColumn("vals", new int[]{7, 18, 18, 30, 120, 121, 122, 122, 8290, 8290})

View File

@ -19,6 +19,7 @@
package org.apache.druid.query.operator.window.ranking;
import com.google.common.collect.ImmutableList;
import org.apache.druid.query.operator.window.ComposingProcessor;
import org.apache.druid.query.operator.window.Processor;
import org.apache.druid.query.operator.window.RowsAndColumnsHelper;
@ -29,6 +30,7 @@ import org.apache.druid.query.rowsandcols.column.DoubleArrayColumn;
import org.apache.druid.query.rowsandcols.column.IntArrayColumn;
import org.apache.druid.query.rowsandcols.column.ObjectArrayColumn;
import org.apache.druid.segment.column.ColumnType;
import org.junit.Assert;
import org.junit.Test;
import java.util.LinkedHashMap;
@ -63,6 +65,11 @@ public class WindowPercentileProcessorTest
new WindowPercentileProcessor("10292", 10292)
);
Assert.assertEquals(
ImmutableList.of("1", "2", "3", "4", "5", "6", "7", "8", "9", "10", "10292"),
processor.getOutputColumnNames()
);
final RowsAndColumnsHelper expectations = new RowsAndColumnsHelper()
.expectColumn("intCol", new int[]{88, 1, 2, 3, 4, 5, 6, 7, 8, 9})
.expectColumn("doubleCol", new double[]{0.4728, 1, 2, 3, 4, 5, 6, 7, 8, 9})

View File

@ -19,6 +19,7 @@
package org.apache.druid.query.operator.window.ranking;
import com.google.common.collect.ImmutableList;
import org.apache.druid.query.operator.window.ComposingProcessor;
import org.apache.druid.query.operator.window.Processor;
import org.apache.druid.query.operator.window.RowsAndColumnsHelper;
@ -26,6 +27,7 @@ import org.apache.druid.query.rowsandcols.MapOfColumnsRowsAndColumns;
import org.apache.druid.query.rowsandcols.RowsAndColumns;
import org.apache.druid.query.rowsandcols.column.Column;
import org.apache.druid.query.rowsandcols.column.IntArrayColumn;
import org.junit.Assert;
import org.junit.Test;
import java.util.Collections;
@ -49,6 +51,8 @@ public class WindowRankProcessorTest
new WindowRankProcessor(orderingCols, "rankAsPercent", true)
);
Assert.assertEquals(ImmutableList.of("rank", "rankAsPercent"), processor.getOutputColumnNames());
final RowsAndColumnsHelper expectations = new RowsAndColumnsHelper()
.expectColumn("vals", new int[]{7, 18, 18, 30, 120, 121, 122, 122, 8290, 8290})
.expectColumn("rank", new int[]{1, 2, 2, 4, 5, 6, 7, 7, 9, 9})

View File

@ -28,8 +28,10 @@ import org.apache.druid.query.rowsandcols.column.DoubleArrayColumn;
import org.apache.druid.query.rowsandcols.column.IntArrayColumn;
import org.apache.druid.query.rowsandcols.column.ObjectArrayColumn;
import org.apache.druid.segment.column.ColumnType;
import org.junit.Assert;
import org.junit.Test;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.Map;
@ -49,6 +51,7 @@ public class WindowRowNumberProcessorTest
MapOfColumnsRowsAndColumns rac = MapOfColumnsRowsAndColumns.fromMap(map);
Processor processor = new WindowRowNumberProcessor("rowRow");
Assert.assertEquals(Collections.singletonList("rowRow"), processor.getOutputColumnNames());
final RowsAndColumnsHelper expectations = new RowsAndColumnsHelper()
.expectColumn("intCol", new int[]{88, 1, 2, 3, 4, 5, 6, 7, 8, 9})

View File

@ -19,6 +19,7 @@
package org.apache.druid.query.operator.window.value;
import com.google.common.collect.ImmutableList;
import org.apache.druid.query.operator.window.ComposingProcessor;
import org.apache.druid.query.operator.window.RowsAndColumnsHelper;
import org.apache.druid.query.rowsandcols.MapOfColumnsRowsAndColumns;
@ -28,6 +29,7 @@ import org.apache.druid.query.rowsandcols.column.DoubleArrayColumn;
import org.apache.druid.query.rowsandcols.column.IntArrayColumn;
import org.apache.druid.query.rowsandcols.column.ObjectArrayColumn;
import org.apache.druid.segment.column.ColumnType;
import org.junit.Assert;
import org.junit.Test;
import java.util.LinkedHashMap;
@ -59,6 +61,11 @@ public class WindowFirstProcessorTest
new WindowFirstProcessor("nullFirstCol", "NullFirstCol")
);
Assert.assertEquals(
ImmutableList.of("FirstIntCol", "FirstDoubleCol", "FirstObjectCol", "NullFirstCol"),
processor.getOutputColumnNames()
);
final RowsAndColumnsHelper expectations = new RowsAndColumnsHelper()
.expectColumn("intCol", new int[]{88, 1, 2, 3, 4, 5, 6, 7, 8, 9})
.expectColumn("doubleCol", new double[]{0.4728, 1, 2, 3, 4, 5, 6, 7, 8, 9})

View File

@ -19,6 +19,7 @@
package org.apache.druid.query.operator.window.value;
import com.google.common.collect.ImmutableList;
import org.apache.druid.query.operator.window.ComposingProcessor;
import org.apache.druid.query.operator.window.RowsAndColumnsHelper;
import org.apache.druid.query.rowsandcols.MapOfColumnsRowsAndColumns;
@ -28,6 +29,7 @@ import org.apache.druid.query.rowsandcols.column.DoubleArrayColumn;
import org.apache.druid.query.rowsandcols.column.IntArrayColumn;
import org.apache.druid.query.rowsandcols.column.ObjectArrayColumn;
import org.apache.druid.segment.column.ColumnType;
import org.junit.Assert;
import org.junit.Test;
import java.util.LinkedHashMap;
@ -58,6 +60,10 @@ public class WindowLastProcessorTest
new WindowLastProcessor("objectCol", "LastObjectCol"),
new WindowLastProcessor("nullLastCol", "NullLastCol")
);
Assert.assertEquals(
ImmutableList.of("LastIntCol", "LastDoubleCol", "LastObjectCol", "NullLastCol"),
processor.getOutputColumnNames()
);
final RowsAndColumnsHelper expectations = new RowsAndColumnsHelper()

View File

@ -591,7 +591,22 @@ public class ExpressionSelectorsTest extends InitializedNullHandlingTest
settableSupplier.set(ImmutableList.of("1", "2", "3"));
Assert.assertArrayEquals(new String[]{"1", "2", "3"}, (Object[]) supplier.get());
}
@Test
public void test_supplierFromObjectSelector_onArray()
{
final SettableSupplier<Object[]> settableSupplier = new SettableSupplier<>();
final Supplier<Object> supplier = ExpressionSelectors.supplierFromObjectSelector(
objectSelectorFromSupplier(settableSupplier, Object[].class),
true
);
Assert.assertNotNull(supplier);
Assert.assertEquals(null, supplier.get());
settableSupplier.set(new String[]{"1", "2", "3"});
Assert.assertArrayEquals(new String[]{"1", "2", "3"}, (Object[]) supplier.get());
}
@Test

View File

@ -21,7 +21,6 @@ package org.apache.druid.metadata;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import org.apache.druid.java.util.common.StringUtils;
public class DerbyMetadataStorageActionHandler<EntryType, StatusType, LogType, LockType>
extends SQLMetadataStorageActionHandler<EntryType, StatusType, LogType, LockType>
@ -46,12 +45,4 @@ public class DerbyMetadataStorageActionHandler<EntryType, StatusType, LogType, L
return sql + " FETCH FIRST :n ROWS ONLY";
}
@Deprecated
@Override
public String getSqlRemoveLogsOlderThan()
{
return StringUtils.format("DELETE FROM %s WHERE %s_id in ("
+ " SELECT id FROM %s WHERE created_date < :date_time and active = false)",
getLogTable(), getEntryTypeName(), getEntryTable());
}
}

View File

@ -20,7 +20,6 @@
package org.apache.druid.metadata;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.druid.java.util.common.StringUtils;
public class PostgreSQLMetadataStorageActionHandler<EntryType, StatusType, LogType, LockType>
extends SQLMetadataStorageActionHandler<EntryType, StatusType, LogType, LockType>
@ -44,13 +43,4 @@ public class PostgreSQLMetadataStorageActionHandler<EntryType, StatusType, LogTy
return sql + " LIMIT :n";
}
@Deprecated
@Override
public String getSqlRemoveLogsOlderThan()
{
return StringUtils.format("DELETE FROM %s USING %s "
+ "WHERE %s_id = %s.id AND created_date < :date_time and active = false",
getLogTable(), getEntryTable(), getEntryTypeName(), getEntryTable());
}
}

View File

@ -518,25 +518,6 @@ public abstract class SQLMetadataConnector implements MetadataStorageConnector
);
}
public void createLogTable(final String tableName, final String entryTypeName)
{
createTable(
tableName,
ImmutableList.of(
StringUtils.format(
"CREATE TABLE %1$s (\n"
+ " id %2$s NOT NULL,\n"
+ " %4$s_id VARCHAR(255) DEFAULT NULL,\n"
+ " log_payload %3$s,\n"
+ " PRIMARY KEY (id)\n"
+ ")",
tableName, getSerialType(), getPayloadType(), entryTypeName
),
StringUtils.format("CREATE INDEX idx_%1$s_%2$s_id ON %1$s(%2$s_id)", tableName, entryTypeName)
)
);
}
public void createLockTable(final String tableName, final String entryTypeName)
{
createTable(
@ -814,7 +795,6 @@ public abstract class SQLMetadataConnector implements MetadataStorageConnector
final MetadataStorageTablesConfig tablesConfig = tablesConfigSupplier.get();
final String entryType = tablesConfig.getTaskEntryType();
prepareTaskEntryTable(tablesConfig.getEntryTable(entryType));
createLogTable(tablesConfig.getLogTable(entryType), entryType);
createLockTable(tablesConfig.getLockTable(entryType), entryType);
}
}

View File

@ -75,12 +75,10 @@ public abstract class SQLMetadataStorageActionHandler<EntryType, StatusType, Log
private final ObjectMapper jsonMapper;
private final TypeReference<EntryType> entryType;
private final TypeReference<StatusType> statusType;
private final TypeReference<LogType> logType;
private final TypeReference<LockType> lockType;
private final String entryTypeName;
private final String entryTable;
private final String logTable;
private final String lockTable;
private final TaskInfoMapper<EntryType, StatusType> taskInfoMapper;
@ -90,7 +88,11 @@ public abstract class SQLMetadataStorageActionHandler<EntryType, StatusType, Log
private Future<Boolean> taskMigrationCompleteFuture;
@SuppressWarnings("PMD.UnnecessaryFullyQualifiedName")
/**
* @deprecated Use the other constructor without {@code logTable} argument
* since this argument is now unused.
*/
@Deprecated
public SQLMetadataStorageActionHandler(
final SQLMetadataConnector connector,
final ObjectMapper jsonMapper,
@ -100,6 +102,19 @@ public abstract class SQLMetadataStorageActionHandler<EntryType, StatusType, Log
final String logTable,
final String lockTable
)
{
this(connector, jsonMapper, types, entryTypeName, entryTable, lockTable);
}
@SuppressWarnings("PMD.UnnecessaryFullyQualifiedName")
public SQLMetadataStorageActionHandler(
final SQLMetadataConnector connector,
final ObjectMapper jsonMapper,
final MetadataStorageActionHandlerTypes<EntryType, StatusType, LogType, LockType> types,
final String entryTypeName,
final String entryTable,
final String lockTable
)
{
this.connector = connector;
//fully qualified references required below due to identical package names across project modules.
@ -108,11 +123,9 @@ public abstract class SQLMetadataStorageActionHandler<EntryType, StatusType, Log
org.apache.druid.metadata.PasswordProviderRedactionMixIn.class);
this.entryType = types.getEntryType();
this.statusType = types.getStatusType();
this.logType = types.getLogType();
this.lockType = types.getLockType();
this.entryTypeName = entryTypeName;
this.entryTable = entryTable;
this.logTable = logTable;
this.lockTable = lockTable;
this.taskInfoMapper = new TaskInfoMapper<>(jsonMapper, entryType, statusType);
this.taskStatusMapper = new TaskStatusMapper(jsonMapper);
@ -142,7 +155,7 @@ public abstract class SQLMetadataStorageActionHandler<EntryType, StatusType, Log
protected String getLogTable()
{
return logTable;
throw new UnsupportedOperationException("'tasklogs' table is not used anymore");
}
protected String getEntryTypeName()
@ -430,7 +443,7 @@ public abstract class SQLMetadataStorageActionHandler<EntryType, StatusType, Log
}
/**
* Wraps the given error in a user friendly DruidException.
* Wraps the given error in a user-friendly DruidException.
*/
private DruidException wrapInDruidException(String taskId, Throwable t)
{
@ -855,21 +868,13 @@ public abstract class SQLMetadataStorageActionHandler<EntryType, StatusType, Log
{
DateTime dateTime = DateTimes.utc(timestamp);
connector.retryWithHandle(
handle -> {
handle.createStatement(getSqlRemoveLogsOlderThan())
.bind("date_time", dateTime.toString())
.execute();
handle ->
handle.createStatement(
StringUtils.format(
"DELETE FROM %s WHERE created_date < :date_time AND active = false",
entryTable
)
)
.bind("date_time", dateTime.toString())
.execute();
return null;
}
).bind("date_time", dateTime.toString()).execute()
);
}
@ -880,78 +885,6 @@ public abstract class SQLMetadataStorageActionHandler<EntryType, StatusType, Log
.execute();
}
@Override
public boolean addLog(final String entryId, final LogType log)
{
return connector.retryWithHandle(
new HandleCallback<Boolean>()
{
@Override
public Boolean withHandle(Handle handle) throws Exception
{
return handle.createStatement(
StringUtils.format(
"INSERT INTO %1$s (%2$s_id, log_payload) VALUES (:entryId, :payload)",
logTable, entryTypeName
)
)
.bind("entryId", entryId)
.bind("payload", jsonMapper.writeValueAsBytes(log))
.execute() == 1;
}
}
);
}
@Override
public List<LogType> getLogs(final String entryId)
{
return connector.retryWithHandle(
new HandleCallback<List<LogType>>()
{
@Override
public List<LogType> withHandle(Handle handle)
{
return handle
.createQuery(
StringUtils.format(
"SELECT log_payload FROM %1$s WHERE %2$s_id = :entryId",
logTable, entryTypeName
)
)
.bind("entryId", entryId)
.map(ByteArrayMapper.FIRST)
.fold(
new ArrayList<>(),
(List<LogType> list, byte[] bytes, FoldController control, StatementContext ctx) -> {
try {
list.add(jsonMapper.readValue(bytes, logType));
return list;
}
catch (IOException e) {
log.makeAlert(e, "Failed to deserialize log")
.addData("entryId", entryId)
.addData("payload", StringUtils.fromUtf8(bytes))
.emit();
throw new SQLException(e);
}
}
);
}
}
);
}
@Deprecated
public String getSqlRemoveLogsOlderThan()
{
return StringUtils.format(
"DELETE a FROM %s a INNER JOIN %s b ON a.%s_id = b.id "
+ "WHERE b.created_date < :date_time and b.active = false",
logTable, entryTable, entryTypeName
);
}
@Override
public Map<Long, LockType> getLocks(final String entryId)
{

View File

@ -56,7 +56,6 @@ public class SQLMetadataConnectorSchemaPersistenceTest
tables.add(tablesConfig.getSegmentsTable());
tables.add(tablesConfig.getRulesTable());
tables.add(tablesConfig.getLockTable(entryType));
tables.add(tablesConfig.getLogTable(entryType));
tables.add(tablesConfig.getEntryTable(entryType));
tables.add(tablesConfig.getAuditTable());
tables.add(tablesConfig.getSupervisorTable());
@ -67,7 +66,6 @@ public class SQLMetadataConnectorSchemaPersistenceTest
dropSequence.add(tablesConfig.getSegmentSchemasTable());
dropSequence.add(tablesConfig.getRulesTable());
dropSequence.add(tablesConfig.getLockTable(entryType));
dropSequence.add(tablesConfig.getLogTable(entryType));
dropSequence.add(tablesConfig.getEntryTable(entryType));
dropSequence.add(tablesConfig.getAuditTable());
dropSequence.add(tablesConfig.getSupervisorTable());

View File

@ -75,7 +75,6 @@ public class SQLMetadataConnectorTest
tables.add(tablesConfig.getSegmentsTable());
tables.add(tablesConfig.getRulesTable());
tables.add(tablesConfig.getLockTable(entryType));
tables.add(tablesConfig.getLogTable(entryType));
tables.add(tablesConfig.getEntryTable(entryType));
tables.add(tablesConfig.getAuditTable());
tables.add(tablesConfig.getSupervisorTable());

View File

@ -34,7 +34,6 @@ import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.jackson.JacksonUtils;
import org.apache.druid.metadata.TaskLookup.ActiveTaskLookup;
import org.apache.druid.metadata.TaskLookup.CompleteTaskLookup;
import org.joda.time.DateTime;
@ -73,12 +72,10 @@ public class SQLMetadataStorageActionHandlerTest
TestDerbyConnector connector = derbyConnectorRule.getConnector();
final String entryType = "entry";
final String logTable = "logs";
final String lockTable = "locks";
connector.prepareTaskEntryTable(entryTable);
connector.createLockTable(lockTable, entryType);
connector.createLogTable(logTable, entryType);
handler = new DerbyMetadataStorageActionHandler<>(
connector,
@ -101,12 +98,6 @@ public class SQLMetadataStorageActionHandlerTest
};
}
@Override
public TypeReference<Map<String, String>> getLogType()
{
return JacksonUtils.TYPE_REFERENCE_MAP_STRING_STRING;
}
@Override
public TypeReference<Map<String, Object>> getLockType()
{
@ -117,7 +108,7 @@ public class SQLMetadataStorageActionHandlerTest
},
entryType,
entryTable,
logTable,
null,
lockTable
);
}
@ -247,36 +238,30 @@ public class SQLMetadataStorageActionHandlerTest
}
@Test
public void testLogs()
public void testAddLogThrowsUnsupportedException()
{
final String entryId = "abcd";
Map<String, Object> entry = ImmutableMap.of("a", 1);
Map<String, Object> status = ImmutableMap.of("count", 42);
handler.insert(entryId, DateTimes.of("2014-01-01"), "test", entry, true, status, "type", "group");
Assert.assertEquals(
ImmutableList.of(),
handler.getLogs("non_exist_entry")
Exception exception = Assert.assertThrows(
DruidException.class,
() -> handler.addLog("abcd", ImmutableMap.of("logentry", "created"))
);
Assert.assertEquals(
ImmutableMap.of(),
handler.getLocks(entryId)
);
final ImmutableMap<String, String> log1 = ImmutableMap.of("logentry", "created");
final ImmutableMap<String, String> log2 = ImmutableMap.of("logentry", "updated");
Assert.assertTrue(handler.addLog(entryId, log1));
Assert.assertTrue(handler.addLog(entryId, log2));
Assert.assertEquals(
ImmutableList.of(log1, log2),
handler.getLogs(entryId)
"Task actions are not logged anymore.",
exception.getMessage()
);
}
@Test
public void testGetLogsThrowsUnsupportedException()
{
Exception exception = Assert.assertThrows(
DruidException.class,
() -> handler.getLogs("abcd")
);
Assert.assertEquals(
"Task actions are not logged anymore.",
exception.getMessage()
);
}
@Test
public void testLocks()
@ -388,19 +373,16 @@ public class SQLMetadataStorageActionHandlerTest
Map<String, Object> entry1 = ImmutableMap.of("numericId", 1234);
Map<String, Object> status1 = ImmutableMap.of("count", 42, "temp", 1);
handler.insert(entryId1, DateTimes.of("2014-01-01T00:00:00.123"), "testDataSource", entry1, false, status1, "type", "group");
Assert.assertTrue(handler.addLog(entryId1, ImmutableMap.of("logentry", "created")));
final String entryId2 = "ABC123";
Map<String, Object> entry2 = ImmutableMap.of("a", 1);
Map<String, Object> status2 = ImmutableMap.of("count", 42);
handler.insert(entryId2, DateTimes.of("2014-01-01T00:00:00.123"), "test", entry2, true, status2, "type", "group");
Assert.assertTrue(handler.addLog(entryId2, ImmutableMap.of("logentry", "created")));
final String entryId3 = "DEF5678";
Map<String, Object> entry3 = ImmutableMap.of("numericId", 5678);
Map<String, Object> status3 = ImmutableMap.of("count", 21, "temp", 2);
handler.insert(entryId3, DateTimes.of("2014-01-02T12:00:00.123"), "testDataSource", entry3, false, status3, "type", "group");
Assert.assertTrue(handler.addLog(entryId3, ImmutableMap.of("logentry", "created")));
Assert.assertEquals(Optional.of(entry1), handler.getEntry(entryId1));
Assert.assertEquals(Optional.of(entry2), handler.getEntry(entryId2));
@ -438,10 +420,6 @@ public class SQLMetadataStorageActionHandlerTest
.collect(Collectors.toList())
);
// tasklogs
Assert.assertEquals(0, handler.getLogs(entryId1).size());
Assert.assertEquals(1, handler.getLogs(entryId2).size());
Assert.assertEquals(1, handler.getLogs(entryId3).size());
}
@Test

View File

@ -58,7 +58,6 @@ import org.apache.druid.indexing.common.TaskStorageDirTracker;
import org.apache.druid.indexing.common.actions.LocalTaskActionClientFactory;
import org.apache.druid.indexing.common.actions.TaskActionClientFactory;
import org.apache.druid.indexing.common.actions.TaskActionToolbox;
import org.apache.druid.indexing.common.actions.TaskAuditLogConfig;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.config.TaskStorageConfig;
import org.apache.druid.indexing.common.stats.DropwizardRowIngestionMetersFactory;
@ -75,9 +74,9 @@ import org.apache.druid.indexing.overlord.MetadataTaskStorage;
import org.apache.druid.indexing.overlord.RemoteTaskRunnerFactory;
import org.apache.druid.indexing.overlord.TaskLockbox;
import org.apache.druid.indexing.overlord.TaskMaster;
import org.apache.druid.indexing.overlord.TaskQueryTool;
import org.apache.druid.indexing.overlord.TaskRunnerFactory;
import org.apache.druid.indexing.overlord.TaskStorage;
import org.apache.druid.indexing.overlord.TaskStorageQueryAdapter;
import org.apache.druid.indexing.overlord.autoscaling.PendingTaskBasedWorkerProvisioningConfig;
import org.apache.druid.indexing.overlord.autoscaling.PendingTaskBasedWorkerProvisioningStrategy;
import org.apache.druid.indexing.overlord.autoscaling.ProvisioningSchedulerConfig;
@ -209,7 +208,6 @@ public class CliOverlord extends ServerRunnable
JsonConfigProvider.bind(binder, "druid.indexer.tasklock", TaskLockConfig.class);
JsonConfigProvider.bind(binder, "druid.indexer.task", TaskConfig.class);
JsonConfigProvider.bind(binder, "druid.indexer.task.default", DefaultTaskConfig.class);
JsonConfigProvider.bind(binder, "druid.indexer.auditlog", TaskAuditLogConfig.class);
binder.bind(RetryPolicyFactory.class).in(LazySingleton.class);
binder.bind(TaskMaster.class).in(ManageLifecycle.class);
@ -231,7 +229,7 @@ public class CliOverlord extends ServerRunnable
binder.bind(TaskActionClientFactory.class).to(LocalTaskActionClientFactory.class).in(LazySingleton.class);
binder.bind(TaskActionToolbox.class).in(LazySingleton.class);
binder.bind(TaskLockbox.class).in(LazySingleton.class);
binder.bind(TaskStorageQueryAdapter.class).in(LazySingleton.class);
binder.bind(TaskQueryTool.class).in(LazySingleton.class);
binder.bind(IndexerMetadataStorageAdapter.class).in(LazySingleton.class);
binder.bind(SupervisorManager.class).in(LazySingleton.class);

View File

@ -78,7 +78,6 @@ import org.apache.druid.indexing.common.actions.LocalTaskActionClientFactory;
import org.apache.druid.indexing.common.actions.RemoteTaskActionClientFactory;
import org.apache.druid.indexing.common.actions.TaskActionClientFactory;
import org.apache.druid.indexing.common.actions.TaskActionToolbox;
import org.apache.druid.indexing.common.actions.TaskAuditLogConfig;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.config.TaskStorageConfig;
import org.apache.druid.indexing.common.stats.DropwizardRowIngestionMetersFactory;
@ -483,7 +482,6 @@ public class CliPeon extends GuiceRunnable
binder.bind(TaskToolboxFactory.class).in(LazySingleton.class);
JsonConfigProvider.bind(binder, "druid.indexer.task", TaskConfig.class);
JsonConfigProvider.bind(binder, "druid.indexer.auditlog", TaskAuditLogConfig.class);
JsonConfigProvider.bind(binder, "druid.peon.taskActionClient.retry", RetryPolicyConfig.class);
configureTaskActionClient(binder);

View File

@ -320,6 +320,36 @@ public class CalciteWindowQueryTest extends BaseCalciteQueryTest
.run();
}
@Test
public void testWithArrayConcat()
{
testBuilder()
.sql("select countryName, cityName, channel, "
+ "array_concat_agg(ARRAY['abc', channel], 10000) over (partition by cityName order by countryName) as c\n"
+ "from wikipedia\n"
+ "where countryName in ('Austria', 'Republic of Korea') "
+ "and (cityName in ('Vienna', 'Seoul') or cityName is null)\n"
+ "group by countryName, cityName, channel")
.queryContext(ImmutableMap.of(
PlannerContext.CTX_ENABLE_WINDOW_FNS, true,
QueryContexts.ENABLE_DEBUG, true
))
.expectedResults(
ResultMatchMode.RELAX_NULLS,
ImmutableList.of(
new Object[]{"Austria", null, "#de.wikipedia", "[\"abc\",\"#de.wikipedia\"]"},
new Object[]{"Republic of Korea", null, "#en.wikipedia", "[\"abc\",\"#de.wikipedia\",\"abc\",\"#en.wikipedia\",\"abc\",\"#ja.wikipedia\",\"abc\",\"#ko.wikipedia\"]"},
new Object[]{"Republic of Korea", null, "#ja.wikipedia", "[\"abc\",\"#de.wikipedia\",\"abc\",\"#en.wikipedia\",\"abc\",\"#ja.wikipedia\",\"abc\",\"#ko.wikipedia\"]"},
new Object[]{"Republic of Korea", null, "#ko.wikipedia", "[\"abc\",\"#de.wikipedia\",\"abc\",\"#en.wikipedia\",\"abc\",\"#ja.wikipedia\",\"abc\",\"#ko.wikipedia\"]"},
new Object[]{"Republic of Korea", "Seoul", "#ko.wikipedia", "[\"abc\",\"#ko.wikipedia\"]"},
new Object[]{"Austria", "Vienna", "#de.wikipedia", "[\"abc\",\"#de.wikipedia\",\"abc\",\"#es.wikipedia\",\"abc\",\"#tr.wikipedia\"]"},
new Object[]{"Austria", "Vienna", "#es.wikipedia", "[\"abc\",\"#de.wikipedia\",\"abc\",\"#es.wikipedia\",\"abc\",\"#tr.wikipedia\"]"},
new Object[]{"Austria", "Vienna", "#tr.wikipedia", "[\"abc\",\"#de.wikipedia\",\"abc\",\"#es.wikipedia\",\"abc\",\"#tr.wikipedia\"]"}
)
)
.run();
}
private WindowOperatorQuery getWindowOperatorQuery(List<Query<?>> queries)
{
assertEquals(1, queries.size());

View File

@ -7533,4 +7533,78 @@ public class DrillWindowQueryTest extends BaseCalciteQueryTest
{
windowQueryTest();
}
/*
Druid query tests
*/
@DrillTest("druid_queries/same_window_across_columns/wikipedia_query_1")
@Test
public void test_same_window_wikipedia_query_1()
{
windowQueryTest();
}
@DrillTest("druid_queries/same_window_across_columns/wikipedia_query_1_named_window")
@Test
public void test_same_window_wikipedia_query_1_named_window()
{
windowQueryTest();
}
@DrillTest("druid_queries/multiple_windows/wikipedia_query_1")
@Test
public void test_multiple_windows_wikipedia_query_1()
{
windowQueryTest();
}
@DrillTest("druid_queries/multiple_windows/wikipedia_query_1_named_windows")
@Test
public void test_multiple_windows_wikipedia_query_1_named_windows()
{
windowQueryTest();
}
@DrillTest("druid_queries/shuffle_columns/wikipedia_query_1")
@Test
public void test_shuffle_columns_wikipedia_query_1()
{
windowQueryTest();
}
@DrillTest("druid_queries/shuffle_columns/wikipedia_query_1_shuffle_1")
@Test
public void test_shuffle_columns_wikipedia_query_1_shuffle_1()
{
windowQueryTest();
}
@DrillTest("druid_queries/shuffle_columns/wikipedia_query_2")
@Test
public void test_shuffle_columns_wikipedia_query_2()
{
windowQueryTest();
}
@DrillTest("druid_queries/shuffle_columns/wikipedia_query_2_shuffle_1")
@Test
public void test_shuffle_columns_wikipedia_query_2_shuffle_1()
{
windowQueryTest();
}
@DrillTest("druid_queries/partition_by_multiple_columns/wikipedia_query_1")
@Test
public void test_partition_by_multiple_columns_wikipedia_query_1()
{
windowQueryTest();
}
@DrillTest("druid_queries/partition_by_multiple_columns/wikipedia_query_2")
@Test
public void test_partition_by_multiple_columns_wikipedia_query_2()
{
windowQueryTest();
}
}

View File

@ -0,0 +1,13 @@
null Austria 1 1
null Republic of Korea 1 2
null Republic of Korea 2 3
null Republic of Korea 3 4
Horsching Austria 2 1
Jeonju Republic of Korea 4 1
Seongnam-si Republic of Korea 5 1
Seoul Republic of Korea 6 1
Suwon-si Republic of Korea 7 1
Vienna Austria 3 1
Vienna Austria 4 2
Vienna Austria 5 3
Yongsan-dong Republic of Korea 8 1

View File

@ -0,0 +1,6 @@
select cityName, countryName,
row_number() over (partition by countryName order by countryName, cityName, channel) as c1,
count(channel) over (partition by cityName order by countryName, cityName, channel) as c2
from wikipedia
where countryName in ('Austria', 'Republic of Korea')
group by countryName, cityName, channel

View File

@ -0,0 +1,13 @@
null Austria 1 1
null Republic of Korea 1 2
null Republic of Korea 2 3
null Republic of Korea 3 4
Horsching Austria 2 1
Jeonju Republic of Korea 4 1
Seongnam-si Republic of Korea 5 1
Seoul Republic of Korea 6 1
Suwon-si Republic of Korea 7 1
Vienna Austria 3 1
Vienna Austria 4 2
Vienna Austria 5 3
Yongsan-dong Republic of Korea 8 1

View File

@ -0,0 +1,9 @@
select cityName, countryName,
row_number() over w1 as c1,
count(channel) over w2 as c2
from wikipedia
where countryName in ('Austria', 'Republic of Korea')
group by countryName, cityName, channel
WINDOW
w1 AS (partition by countryName order by countryName, cityName, channel),
w2 AS (partition by cityName order by countryName, cityName, channel)

View File

@ -0,0 +1,15 @@
Austria null 94 7
Austria null 4685 7
Austria null 14 7
Austria null 0 7
Austria null 272 7
Austria null 0 7
Austria null 6979 7
Guatemala null 0 1
Guatemala El Salvador 1 1
Guatemala Guatemala City 173 1
Austria Horsching 0 1
Austria Vienna 93 4
Austria Vienna 72 4
Austria Vienna 0 4
Austria Vienna 0 4

View File

@ -0,0 +1,7 @@
SELECT
countryName,
cityName,
added,
count(added) OVER (PARTITION BY countryName, cityName)
FROM "wikipedia"
where countryName in ('Guatemala', 'Austria')

View File

@ -0,0 +1,15 @@
Austria null 0 7 12044 1
Austria null 0 7 12044 2
Austria null 14 7 12044 1
Austria null 94 7 12044 1
Austria null 272 7 12044 1
Austria null 4685 7 12044 1
Austria null 6979 7 12044 1
Guatemala null 0 1 0 1
Guatemala El Salvador 1 1 1 1
Guatemala Guatemala City 173 1 173 1
Austria Horsching 0 1 0 1
Austria Vienna 0 4 165 1
Austria Vienna 0 4 165 2
Austria Vienna 72 4 165 1
Austria Vienna 93 4 165 1

View File

@ -0,0 +1,9 @@
SELECT
countryName,
cityName,
added,
count(added) OVER (PARTITION BY countryName, cityName),
sum(added) OVER (PARTITION BY countryName, cityName),
ROW_NUMBER() OVER (PARTITION BY countryName, cityName, added)
FROM "wikipedia"
where countryName in ('Guatemala', 'Austria')

Some files were not shown because too many files have changed in this diff Show More