Materialized view implementation (#5556)

* implement materialized view

* modify code according to jihoonson's comments

* modify code according to jihoonson's comments - 2

* add documentation about materialized view

* use new HadoopTuningConfig in pr 5583

* add minDataLag and fix optimizer bug

* correct value of DEFAULT_MIN_DATA_LAG_MS

* modify code according to jihoonson's comments - 3

* use the boolean expression instead of if-else
This commit is contained in:
zhangxinyu 2018-06-10 03:24:54 +08:00 committed by Jihoon Son
parent 6f0aedd6ab
commit e43e5ebbcd
32 changed files with 3948 additions and 0 deletions

View File

@ -243,6 +243,10 @@
<argument>io.druid.extensions.contrib:druid-time-min-max</argument>
<argument>-c</argument>
<argument>io.druid.extensions.contrib:druid-virtual-columns</argument>
<argument>-c</argument>
<argument>io.druid.extensions.contrib:materialized-view-maintenance</argument>
<argument>-c</argument>
<argument>io.druid.extensions.contrib:materialized-view-selection</argument>
</arguments>
</configuration>
</execution>

View File

@ -0,0 +1,114 @@
---
layout: doc_page
---
# Materialized View
To use this feature, make sure to only load materialized-view-selection on broker and load materialized-view-maintenance on overlord. In addtion, this feature currently requires a hadoop cluster.
This feature enables Druid to greatly improve the query performance, especially when the query dataSource has a very large number of dimensions but the query only required several dimensions. This feature includes two parts. One is `materialized-view-maintenance`, and the other is `materialized-view-selection`.
## Materialized-view-maintenance
In materialized-view-maintenance, dataSouces user ingested are called "base-dataSource". For each base-dataSource, we can submit `derivativeDataSource` supervisors to create and maintain other dataSources which we called "derived-dataSource". The deminsions and metrics of derived-dataSources are the subset of base-dataSource's.
The `derivativeDataSource` supervisor is used to keep the timeline of derived-dataSource consistent with base-dataSource. Each `derivativeDataSource` supervisor is responsible for one derived-dataSource.
A sample derivativeDataSource supervisor spec is shown below:
```json
{
"type": "derivativeDataSource",
"baseDataSource": "wikiticker",
"dimensionsSpec": {
"dimensions": [
"isUnpatrolled",
"metroCode",
"namespace",
"page",
"regionIsoCode",
"regionName",
"user"
]
},
"metricsSpec": [
{
"name": "count",
"type": "count"
},
{
"name": "added",
"type": "longSum",
"fieldName": "added"
}
],
"tuningConfig": {
"type": "hadoop"
}
}
```
**Supervisor Configuration**
|Field|Description|Required|
|--------|-----------|---------|
|Type |The supervisor type. This should always be derivativeDataSource |yes|
|baseDataSource |The name of base dataSource. This dataSource data should be already stored inside Druid, and the dataSource will be used as input data. See [dataSource inputSpec](http://druid.io/docs/latest/ingestion/update-existing-data.html#datasource "dataSource inputSpec"). |yes|
|dimensionsSpec |Specifies the dimensions of the data. These dimensions must be the subset of baseDataSource's dimensions. |yes|
|metricsSpec |A list of aggregators. These metrics must be the subset of baseDataSource's metrics. See [aggregations](http://druid.io/docs/latest/querying/aggregations.html "aggregations") |yes|
|tuningConfig |TuningConfig must be HadoopTuningConfig. See [hadoop tuning config]( http://druid.io/docs/latest/ingestion/batch-ingestion.html#tuningconfig "hadoop tuning config") |yes|
|dataSource |The name of this derived dataSource. |no(default=baseDataSource-hashCode of supervisor)|
|hadoopDependencyCoordinates |A JSON array of Hadoop dependency coordinates that Druid will use, this property will override the default Hadoop coordinates. Once specified, Druid will look for those Hadoop dependencies from the location specified by druid.extensions.hadoopDependenciesDir |no|
|classpathPrefix |Classpath that will be pre-appended for the peon process. |no|
|context |See below. |no|
**Context**
|Field|Description|Required|
|--------|-----------|---------|
|maxTaskCount |The max number of tasks the supervisor can submit simultaneously. |no(default=1)|
## Materialized-view-selection
In materialized-view-selection, we implement a new query type `view`. When we request a view query, Druid will try its best to optimize the query based on query dataSource and intervals.
A sample view query spec is shown below:
```json
{
"queryType": "view",
"query": {
"queryType": "groupBy",
"dataSource": "wikiticker",
"granularity": "all",
"dimensions": [
"user"
],
"limitSpec": {
"type": "default",
"limit": 1,
"columns": [
{
"dimension": "added",
"direction": "descending",
"dimensionOrder": "numeric"
}
]
},
"aggregations": [
{
"type": "longSum",
"name": "added",
"fieldName": "added"
}
],
"intervals": [
"2015-09-12/2015-09-13"
]
}
}
```
There are 2 parts in a view query:
|Field|Description|Required|
|--------|-----------|---------|
|queryType |The query type. This should always be view |yes|
|query |The real query of this `view` query. The real query must be [groupby](http://druid.io/docs/latest/querying/groupbyquery.html "groupby")/[topn](http://druid.io/docs/latest/querying/topnquery.html "topn")/[timeseries](http://druid.io/docs/latest/querying/timeseriesquery.html "timeseries") type. |yes|
**Note that Materialized View is currently designated as experimental. Please make sure the time of all nodes are the same and increase monotonically. Otherwise, some unexpected errors may happen on query results.**

View File

@ -0,0 +1,70 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to Metamarkets Group Inc. (Metamarkets) under one
~ or more contributor license agreements. See the NOTICE file
~ distributed with this work for additional information
~ regarding copyright ownership. Metamarkets licenses this file
~ to you under the Apache License, Version 2.0 (the
~ "License"); you may not use this file except in compliance
~ with the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing,
~ software distributed under the License is distributed on an
~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
~ KIND, either express or implied. See the License for the
~ specific language governing permissions and limitations
~ under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>druid</artifactId>
<groupId>io.druid</groupId>
<version>0.13.0-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
<groupId>io.druid.extensions.contrib</groupId>
<artifactId>materialized-view-maintenance</artifactId>
<name>materialized-view-maintenance</name>
<dependencies>
<dependency>
<groupId>io.druid</groupId>
<artifactId>druid-server</artifactId>
<version>${project.parent.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>io.druid</groupId>
<artifactId>druid-indexing-service</artifactId>
<version>${project.parent.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>io.druid</groupId>
<artifactId>druid-server</artifactId>
<version>${project.parent.version}</version>
<scope>test</scope>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>io.druid</groupId>
<artifactId>druid-processing</artifactId>
<version>${project.parent.version}</version>
<scope>test</scope>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>org.easymock</groupId>
<artifactId>easymock</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,129 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.indexing.materializedview;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.collect.Sets;
import io.druid.indexing.overlord.DataSourceMetadata;
import java.util.Objects;
import java.util.Set;
public class DerivativeDataSourceMetadata implements DataSourceMetadata
{
private final String baseDataSource;
private final Set<String> dimensions;
private final Set<String> metrics;
@JsonCreator
public DerivativeDataSourceMetadata(
@JsonProperty("baseDataSource") String baseDataSource,
@JsonProperty("dimensions") Set<String> dimensions,
@JsonProperty("metrics") Set<String> metrics
)
{
this.baseDataSource = Preconditions.checkNotNull(baseDataSource, "baseDataSource cannot be null. This is not a valid DerivativeDataSourceMetadata.");
this.dimensions = Preconditions.checkNotNull(dimensions, "dimensions cannot be null. This is not a valid DerivativeDataSourceMetadata.");
this.metrics = Preconditions.checkNotNull(metrics, "metrics cannot be null. This is not a valid DerivativeDataSourceMetadata.");
}
@JsonProperty("baseDataSource")
public String getBaseDataSource()
{
return baseDataSource;
}
@JsonProperty("dimensions")
public Set<String> getDimensions()
{
return dimensions;
}
@JsonProperty("metrics")
public Set<String> getMetrics()
{
return metrics;
}
@Override
public boolean isValidStart()
{
return false;
}
@Override
public boolean matches(DataSourceMetadata other)
{
return equals(other);
}
@Override
public DataSourceMetadata plus(DataSourceMetadata other)
{
throw new UnsupportedOperationException("Derivative dataSource metadata is not allowed to plus");
}
@Override
public DataSourceMetadata minus(DataSourceMetadata other)
{
throw new UnsupportedOperationException("Derivative dataSource metadata is not allowed to minus");
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
DerivativeDataSourceMetadata that = (DerivativeDataSourceMetadata) o;
return baseDataSource.equals(that.getBaseDataSource()) &&
dimensions.equals(that.getDimensions()) &&
metrics.equals(that.getMetrics());
}
@Override
public int hashCode()
{
return Objects.hash(baseDataSource, dimensions, metrics);
}
public Set<String> getColumns()
{
Set<String> fields = Sets.newHashSet(dimensions);
fields.addAll(metrics);
return fields;
}
@Override
public String toString()
{
return "DerivedDataSourceMetadata{" +
"baseDataSource=" + baseDataSource +
", dimensions=" + dimensions +
", metrics=" + metrics +
'}';
}
}

View File

@ -0,0 +1,51 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.indexing.materializedview;
import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.google.common.collect.ImmutableList;
import com.google.inject.Binder;
import io.druid.guice.JsonConfigProvider;
import io.druid.initialization.DruidModule;
import java.util.List;
public class MaterializedViewMaintenanceDruidModule implements DruidModule
{
@Override
public List<? extends Module> getJacksonModules()
{
return ImmutableList.of(
new SimpleModule(getClass().getSimpleName())
.registerSubtypes(
new NamedType(MaterializedViewSupervisorSpec.class, "derivativeDataSource"),
new NamedType(DerivativeDataSourceMetadata.class, "derivativeDataSource")
)
);
}
@Override
public void configure(Binder binder)
{
JsonConfigProvider.bind(binder, "druid.materialized.view.task", MaterializedViewTaskConfig.class);
}
}

View File

@ -0,0 +1,471 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.indexing.materializedview;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.MapDifference;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningScheduledExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.task.HadoopIndexTask;
import io.druid.indexing.overlord.DataSourceMetadata;
import io.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
import io.druid.indexing.overlord.TaskMaster;
import io.druid.indexing.overlord.TaskStorage;
import io.druid.indexing.overlord.supervisor.Supervisor;
import io.druid.indexing.overlord.supervisor.SupervisorReport;
import io.druid.java.util.common.DateTimes;
import io.druid.java.util.common.IAE;
import io.druid.java.util.common.Intervals;
import io.druid.java.util.common.JodaUtils;
import io.druid.java.util.common.Pair;
import io.druid.java.util.common.StringUtils;
import io.druid.java.util.common.concurrent.Execs;
import io.druid.java.util.common.guava.Comparators;
import io.druid.java.util.emitter.EmittingLogger;
import io.druid.metadata.EntryExistsException;
import io.druid.metadata.MetadataSupervisorManager;
import io.druid.metadata.SQLMetadataSegmentManager;
import io.druid.timeline.DataSegment;
import org.joda.time.Duration;
import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedMap;
import java.util.concurrent.TimeUnit;
public class MaterializedViewSupervisor implements Supervisor
{
private static final EmittingLogger log = new EmittingLogger(MaterializedViewSupervisor.class);
private static final Interval ALL_INTERVAL = Intervals.of("0000-01-01/3000-01-01");
private static final int DEFAULT_MAX_TASK_COUNT = 1;
// there is a lag between derivatives and base dataSource, to prevent repeatedly building for some delay data.
private static final long DEFAULT_MIN_DATA_LAG_MS = 24 * 3600 * 1000L;
private final MetadataSupervisorManager metadataSupervisorManager;
private final IndexerMetadataStorageCoordinator metadataStorageCoordinator;
private final SQLMetadataSegmentManager segmentManager;
private final MaterializedViewSupervisorSpec spec;
private final TaskMaster taskMaster;
private final TaskStorage taskStorage;
private final MaterializedViewTaskConfig config;
private final String dataSource;
private final String supervisorId;
private final int maxTaskCount;
private final long minDataLagMs;
private final Map<Interval, HadoopIndexTask> runningTasks = Maps.newHashMap();
private final Map<Interval, String> runningVersion = Maps.newHashMap();
// taskLock is used to synchronize runningTask and runningVersion
private final Object taskLock = new Object();
// stateLock is used to synchronize materializedViewSupervisor's status
private final Object stateLock = new Object();
private boolean started = false;
private ListenableFuture<?> future = null;
private ListeningScheduledExecutorService exec = null;
// In the missing intervals, baseDataSource has data but derivedDataSource does not, which means
// data in these intervals of derivedDataSource needs to be rebuilt.
private Set<Interval> missInterval = Sets.newHashSet();
public MaterializedViewSupervisor(
TaskMaster taskMaster,
TaskStorage taskStorage,
MetadataSupervisorManager metadataSupervisorManager,
SQLMetadataSegmentManager segmentManager,
IndexerMetadataStorageCoordinator metadataStorageCoordinator,
MaterializedViewTaskConfig config,
MaterializedViewSupervisorSpec spec
)
{
this.taskMaster = taskMaster;
this.taskStorage = taskStorage;
this.metadataStorageCoordinator = metadataStorageCoordinator;
this.segmentManager = segmentManager;
this.metadataSupervisorManager = metadataSupervisorManager;
this.config = config;
this.spec = spec;
this.dataSource = spec.getDataSourceName();
this.supervisorId = StringUtils.format("MaterializedViewSupervisor-%s", dataSource);
this.maxTaskCount = spec.getContext().containsKey("maxTaskCount")
? Integer.parseInt(String.valueOf(spec.getContext().get("maxTaskCount")))
: DEFAULT_MAX_TASK_COUNT;
this.minDataLagMs = spec.getContext().containsKey("minDataLagMs")
? Long.parseLong(String.valueOf(spec.getContext().get("minDataLagMs")))
: DEFAULT_MIN_DATA_LAG_MS;
}
@Override
public void start()
{
synchronized (stateLock) {
Preconditions.checkState(!started, "already started");
DataSourceMetadata metadata = metadataStorageCoordinator.getDataSourceMetadata(dataSource);
if (null == metadata) {
metadataStorageCoordinator.insertDataSourceMetadata(
dataSource,
new DerivativeDataSourceMetadata(spec.getBaseDataSource(), spec.getDimensions(), spec.getMetrics())
);
}
exec = MoreExecutors.listeningDecorator(Execs.scheduledSingleThreaded(supervisorId));
final Duration delay = config.getTaskCheckDuration().toStandardDuration();
future = exec.scheduleWithFixedDelay(
new Runnable() {
@Override
public void run()
{
try {
DataSourceMetadata metadata = metadataStorageCoordinator.getDataSourceMetadata(dataSource);
if (metadata instanceof DerivativeDataSourceMetadata
&& spec.getBaseDataSource().equals(((DerivativeDataSourceMetadata) metadata).getBaseDataSource())
&& spec.getDimensions().equals(((DerivativeDataSourceMetadata) metadata).getDimensions())
&& spec.getMetrics().equals(((DerivativeDataSourceMetadata) metadata).getMetrics())) {
checkSegmentsAndSubmitTasks();
} else {
log.error(
"Failed to start %s. Metadata in database(%s) is different from new dataSource metadata(%s)",
supervisorId,
metadata,
spec
);
}
}
catch (Exception e) {
log.makeAlert(e, StringUtils.format("uncaught exception in %s.", supervisorId)).emit();
}
}
},
0,
delay.getMillis(),
TimeUnit.MILLISECONDS
);
started = true;
}
}
@Override
public void stop(boolean stopGracefully)
{
synchronized (stateLock) {
Preconditions.checkState(started, "not started");
// stop all schedulers and threads
if (stopGracefully) {
synchronized (taskLock) {
future.cancel(false);
future = null;
exec.shutdownNow();
exec = null;
clearTasks();
if (!(metadataSupervisorManager.getLatest().get(supervisorId) instanceof MaterializedViewSupervisorSpec)) {
clearSegments();
}
}
} else {
future.cancel(true);
future = null;
exec.shutdownNow();
exec = null;
synchronized (taskLock) {
clearTasks();
if (!(metadataSupervisorManager.getLatest().get(supervisorId) instanceof MaterializedViewSupervisorSpec)) {
clearSegments();
}
}
}
started = false;
}
}
@Override
public SupervisorReport getStatus()
{
return new MaterializedViewSupervisorReport(
dataSource,
DateTimes.nowUtc(),
spec.getBaseDataSource(),
spec.getDimensions(),
spec.getMetrics(),
JodaUtils.condenseIntervals(missInterval)
);
}
@Override
public void reset(DataSourceMetadata dataSourceMetadata)
{
if (dataSourceMetadata == null) {
// if oldMetadata is different from spec, tasks and segments will be removed when reset.
DataSourceMetadata oldMetadata = metadataStorageCoordinator.getDataSourceMetadata(dataSource);
if (oldMetadata instanceof DerivativeDataSourceMetadata) {
if (!((DerivativeDataSourceMetadata) oldMetadata).getBaseDataSource().equals(spec.getBaseDataSource()) ||
!((DerivativeDataSourceMetadata) oldMetadata).getDimensions().equals(spec.getDimensions()) ||
!((DerivativeDataSourceMetadata) oldMetadata).getMetrics().equals(spec.getMetrics())) {
synchronized (taskLock) {
clearTasks();
clearSegments();
}
}
}
commitDataSourceMetadata(
new DerivativeDataSourceMetadata(spec.getBaseDataSource(), spec.getDimensions(), spec.getMetrics())
);
} else {
throw new IAE("DerivedDataSourceMetadata is not allowed to reset to a new DerivedDataSourceMetadata");
}
}
@Override
public void checkpoint(
@Nullable String sequenceName,
@Nullable DataSourceMetadata previousCheckPoint,
@Nullable DataSourceMetadata currentCheckPoint
)
{
// do nothing
}
/**
* Find intervals in which derived dataSource should rebuild the segments.
* Choose the latest intervals to create new HadoopIndexTask and submit it.
*/
@VisibleForTesting
void checkSegmentsAndSubmitTasks()
{
synchronized (taskLock) {
for (Map.Entry<Interval, HadoopIndexTask> entry : runningTasks.entrySet()) {
Optional<TaskStatus> taskStatus = taskStorage.getStatus(entry.getValue().getId());
if (!taskStatus.isPresent() || !taskStatus.get().isRunnable()) {
runningTasks.remove(entry.getKey());
runningVersion.remove(entry.getKey());
}
}
if (runningTasks.size() == maxTaskCount) {
//if the number of running tasks reach the max task count, supervisor won't submit new tasks.
return;
}
Pair<SortedMap<Interval, String>, Map<Interval, List<DataSegment>>> toBuildIntervalAndBaseSegments = checkSegments();
SortedMap<Interval, String> sortedToBuildVersion = toBuildIntervalAndBaseSegments.lhs;
Map<Interval, List<DataSegment>> baseSegments = toBuildIntervalAndBaseSegments.rhs;
missInterval = sortedToBuildVersion.keySet();
submitTasks(sortedToBuildVersion, baseSegments);
}
}
/**
* Find infomation about the intervals in which derived dataSource data should be rebuilt.
* The infomation includes the version and DataSegments list of a interval.
* The intervals include: in the interval,
* 1) baseDataSource has data, but the derivedDataSource does not;
* 2) version of derived segments isn't the max(created_date) of all base segments;
*
* Drop the segments of the intervals in which derivedDataSource has data, but baseDataSource does not.
*
* @return the left part of Pair: interval -> version, and the right part: interval -> DataSegment list.
* Version and DataSegment list can be used to create HadoopIndexTask.
* Derived datasource data in all these intervals need to be rebuilt.
*/
@VisibleForTesting
Pair<SortedMap<Interval, String>, Map<Interval, List<DataSegment>>> checkSegments()
{
// Pair<interval -> version, interval -> list<DataSegment>>
Pair<Map<Interval, String>, Map<Interval, List<DataSegment>>> derivativeSegmentsSnapshot =
getVersionAndBaseSegments(
metadataStorageCoordinator.getUsedSegmentsForInterval(
dataSource,
ALL_INTERVAL
)
);
// Pair<interval -> max(created_date), interval -> list<DataSegment>>
Pair<Map<Interval, String>, Map<Interval, List<DataSegment>>> baseSegmentsSnapshot =
getMaxCreateDateAndBaseSegments(
metadataStorageCoordinator.getUsedSegmentAndCreatedDateForInterval(
spec.getBaseDataSource(),
ALL_INTERVAL
)
);
// baseSegments are used to create HadoopIndexTask
Map<Interval, List<DataSegment>> baseSegments = baseSegmentsSnapshot.rhs;
Map<Interval, List<DataSegment>> derivativeSegments = derivativeSegmentsSnapshot.rhs;
// use max created_date of base segments as the version of derivative segments
Map<Interval, String> maxCreatedDate = baseSegmentsSnapshot.lhs;
Map<Interval, String> derivativeVersion = derivativeSegmentsSnapshot.lhs;
SortedMap<Interval, String> sortedToBuildInterval = Maps.newTreeMap(
Comparators.inverse(Comparators.intervalsByStartThenEnd())
);
// find the intervals to drop and to build
MapDifference<Interval, String> difference = Maps.difference(maxCreatedDate, derivativeVersion);
Map<Interval, String> toBuildInterval = Maps.newHashMap(difference.entriesOnlyOnLeft());
Map<Interval, String> toDropInterval = Maps.newHashMap(difference.entriesOnlyOnRight());
// if some intervals are in running tasks and the versions are the same, remove it from toBuildInterval
// if some intervals are in running tasks, but the versions are different, stop the task.
for (Interval interval : runningVersion.keySet()) {
if (toBuildInterval.containsKey(interval)
&& toBuildInterval.get(interval).equals(runningVersion.get(interval))
) {
toBuildInterval.remove(interval);
} else if (
toBuildInterval.containsKey(interval)
&& !toBuildInterval.get(interval).equals(runningVersion.get(interval))
) {
if (taskMaster.getTaskQueue().isPresent()) {
taskMaster.getTaskQueue().get().shutdown(runningTasks.get(interval).getId());
runningTasks.remove(interval);
}
}
}
// drop derivative segments which interval equals the interval in toDeleteBaseSegments
for (Interval interval : toDropInterval.keySet()) {
for (DataSegment segment : derivativeSegments.get(interval)) {
segmentManager.removeSegment(dataSource, segment.getIdentifier());
}
}
// data of the latest interval will be built firstly.
sortedToBuildInterval.putAll(toBuildInterval);
return new Pair<>(sortedToBuildInterval, baseSegments);
}
private void submitTasks(
SortedMap<Interval, String> sortedToBuildVersion,
Map<Interval, List<DataSegment>> baseSegments
)
{
for (Map.Entry<Interval, String> entry : sortedToBuildVersion.entrySet()) {
if (runningTasks.size() < maxTaskCount) {
HadoopIndexTask task = spec.createTask(entry.getKey(), entry.getValue(), baseSegments.get(entry.getKey()));
try {
if (taskMaster.getTaskQueue().isPresent()) {
taskMaster.getTaskQueue().get().add(task);
runningVersion.put(entry.getKey(), entry.getValue());
runningTasks.put(entry.getKey(), task);
}
}
catch (EntryExistsException e) {
log.error("task %s already exsits", task);
}
catch (Exception e) {
throw new RuntimeException(e);
}
}
}
}
private Pair<Map<Interval, String>, Map<Interval, List<DataSegment>>> getVersionAndBaseSegments(
List<DataSegment> snapshot
)
{
Map<Interval, String> versions = Maps.newHashMap();
Map<Interval, List<DataSegment>> segments = Maps.newHashMap();
for (DataSegment segment : snapshot) {
Interval interval = segment.getInterval();
versions.put(interval, segment.getVersion());
segments.putIfAbsent(interval, Lists.newArrayList());
segments.get(interval).add(segment);
}
return new Pair<>(versions, segments);
}
private Pair<Map<Interval, String>, Map<Interval, List<DataSegment>>> getMaxCreateDateAndBaseSegments(
List<Pair<DataSegment, String>> snapshot
)
{
Interval maxAllowedToBuildInterval = snapshot.parallelStream()
.map(pair -> pair.lhs)
.map(DataSegment::getInterval)
.max(Comparators.intervalsByStartThenEnd())
.get();
Map<Interval, String> maxCreatedDate = Maps.newHashMap();
Map<Interval, List<DataSegment>> segments = Maps.newHashMap();
for (Pair<DataSegment, String> entry : snapshot) {
DataSegment segment = entry.lhs;
String createDate = entry.rhs;
Interval interval = segment.getInterval();
if (!hasEnoughLag(interval, maxAllowedToBuildInterval)) {
continue;
}
maxCreatedDate.put(
interval,
DateTimes.max(
DateTimes.of(createDate),
DateTimes.of(maxCreatedDate.getOrDefault(interval, DateTimes.MIN.toString()))
).toString()
);
segments.putIfAbsent(interval, Lists.newArrayList());
segments.get(interval).add(segment);
}
return new Pair<>(maxCreatedDate, segments);
}
/**
* check whether the start millis of target interval is more than minDataLagMs lagging behind maxInterval's
* minDataLag is required to prevent repeatedly building data because of delay data.
*
* @param target
* @param maxInterval
* @return true if the start millis of target interval is more than minDataLagMs lagging behind maxInterval's
*/
private boolean hasEnoughLag(Interval target, Interval maxInterval)
{
return minDataLagMs <= (maxInterval.getStartMillis() - target.getStartMillis());
}
private void clearTasks()
{
for (HadoopIndexTask task : runningTasks.values()) {
if (taskMaster.getTaskQueue().isPresent()) {
taskMaster.getTaskQueue().get().shutdown(task.getId());
}
}
runningTasks.clear();
runningVersion.clear();
}
private void clearSegments()
{
log.info("Clear all metadata of dataSource %s", dataSource);
metadataStorageCoordinator.deletePendingSegments(dataSource, ALL_INTERVAL);
segmentManager.removeDatasource(dataSource);
metadataStorageCoordinator.deleteDataSourceMetadata(dataSource);
}
private void commitDataSourceMetadata(DataSourceMetadata dataSourceMetadata)
{
if (!metadataStorageCoordinator.insertDataSourceMetadata(dataSource, dataSourceMetadata)) {
try {
metadataStorageCoordinator.resetDataSourceMetadata(
dataSource,
dataSourceMetadata
);
}
catch (IOException e) {
throw new RuntimeException(e);
}
}
}
}

View File

@ -0,0 +1,50 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.indexing.materializedview;
import com.google.common.collect.Sets;
import io.druid.indexing.overlord.supervisor.SupervisorReport;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import java.util.List;
import java.util.Set;
public class MaterializedViewSupervisorReport extends SupervisorReport
{
public MaterializedViewSupervisorReport(
String dataSource,
DateTime generationTime,
String baseDataSource,
Set<String> dimensions,
Set<String> metrics,
List<Interval> missTimeline
)
{
super(dataSource, generationTime, "{" +
"dataSource='" + dataSource + '\'' +
", baseDataSource='" + baseDataSource + '\'' +
", dimensions=" + dimensions +
", metrics=" + metrics +
", missTimeline" + Sets.newHashSet(missTimeline) +
"}");
}
}

View File

@ -0,0 +1,344 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.indexing.materializedview;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import io.druid.data.input.impl.DimensionSchema;
import io.druid.data.input.impl.DimensionsSpec;
import io.druid.indexer.HadoopIOConfig;
import io.druid.indexer.HadoopIngestionSpec;
import io.druid.indexer.HadoopTuningConfig;
import io.druid.indexer.hadoop.DatasourceIngestionSpec;
import io.druid.indexing.common.task.HadoopIndexTask;
import io.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
import io.druid.indexing.overlord.TaskMaster;
import io.druid.indexing.overlord.TaskStorage;
import io.druid.indexing.overlord.supervisor.Supervisor;
import io.druid.indexing.overlord.supervisor.SupervisorSpec;
import io.druid.java.util.common.DateTimes;
import io.druid.java.util.common.StringUtils;
import io.druid.java.util.common.granularity.Granularities;
import io.druid.metadata.MetadataSupervisorManager;
import io.druid.metadata.SQLMetadataSegmentManager;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.granularity.ArbitraryGranularitySpec;
import io.druid.segment.realtime.firehose.ChatHandlerProvider;
import io.druid.segment.transform.TransformSpec;
import io.druid.server.security.AuthorizerMapper;
import io.druid.timeline.DataSegment;
import org.apache.commons.codec.digest.DigestUtils;
import org.joda.time.Interval;
import java.util.List;
import java.util.Map;
import java.util.Set;
public class MaterializedViewSupervisorSpec implements SupervisorSpec
{
private static final String TASK_PREFIX = "index_materialized_view";
private final String baseDataSource;
private final DimensionsSpec dimensionsSpec;
private final AggregatorFactory[] aggregators;
private final HadoopTuningConfig tuningConfig;
private final String dataSourceName;
private final String hadoopCoordinates;
private final List<String> hadoopDependencyCoordinates;
private final String classpathPrefix;
private final Map<String, Object> context;
private final Set<String> metrics;
private final Set<String> dimensions;
private final ObjectMapper objectMapper;
private final MetadataSupervisorManager metadataSupervisorManager;
private final IndexerMetadataStorageCoordinator metadataStorageCoordinator;
private final SQLMetadataSegmentManager segmentManager;
private final TaskMaster taskMaster;
private final TaskStorage taskStorage;
private final MaterializedViewTaskConfig config;
private final AuthorizerMapper authorizerMapper;
private final ChatHandlerProvider chatHandlerProvider;
public MaterializedViewSupervisorSpec(
@JsonProperty("baseDataSource") String baseDataSource,
@JsonProperty("dimensionsSpec") DimensionsSpec dimensionsSpec,
@JsonProperty("metricsSpec") AggregatorFactory[] aggregators,
@JsonProperty("tuningConfig") HadoopTuningConfig tuningConfig,
@JsonProperty("dataSource") String dataSourceName,
@JsonProperty("hadoopCoordinates") String hadoopCoordinates,
@JsonProperty("hadoopDependencyCoordinates") List<String> hadoopDependencyCoordinates,
@JsonProperty("classpathPrefix") String classpathPrefix,
@JsonProperty("context") Map<String, Object> context,
@JacksonInject ObjectMapper objectMapper,
@JacksonInject TaskMaster taskMaster,
@JacksonInject TaskStorage taskStorage,
@JacksonInject MetadataSupervisorManager metadataSupervisorManager,
@JacksonInject SQLMetadataSegmentManager segmentManager,
@JacksonInject IndexerMetadataStorageCoordinator metadataStorageCoordinator,
@JacksonInject MaterializedViewTaskConfig config,
@JacksonInject AuthorizerMapper authorizerMapper,
@JacksonInject ChatHandlerProvider chatHandlerProvider
)
{
this.baseDataSource = Preconditions.checkNotNull(
baseDataSource,
"baseDataSource cannot be null. Please provide a baseDataSource."
);
this.dimensionsSpec = Preconditions.checkNotNull(
dimensionsSpec,
"dimensionsSpec cannot be null. Please provide a dimensionsSpec"
);
this.aggregators = Preconditions.checkNotNull(
aggregators,
"metricsSpec cannot be null. Please provide a metricsSpec"
);
this.tuningConfig = Preconditions.checkNotNull(
tuningConfig,
"tuningConfig cannot be null. Please provide tuningConfig"
);
this.dataSourceName = dataSourceName == null
? StringUtils.format(
"%s-%s",
baseDataSource,
DigestUtils.sha1Hex(dimensionsSpec.toString()).substring(0, 8)
)
: dataSourceName;
this.hadoopCoordinates = hadoopCoordinates;
this.hadoopDependencyCoordinates = hadoopDependencyCoordinates;
this.classpathPrefix = classpathPrefix;
this.context = context == null ? Maps.newHashMap() : context;
this.objectMapper = objectMapper;
this.taskMaster = taskMaster;
this.taskStorage = taskStorage;
this.metadataSupervisorManager = metadataSupervisorManager;
this.segmentManager = segmentManager;
this.metadataStorageCoordinator = metadataStorageCoordinator;
this.authorizerMapper = authorizerMapper;
this.chatHandlerProvider = chatHandlerProvider;
this.config = config;
this.metrics = Sets.newHashSet();
for (AggregatorFactory aggregatorFactory : aggregators) {
metrics.add(aggregatorFactory.getName());
}
this.dimensions = Sets.newHashSet();
for (DimensionSchema schema : dimensionsSpec.getDimensions()) {
dimensions.add(schema.getName());
}
}
public HadoopIndexTask createTask(Interval interval, String version, List<DataSegment> segments)
{
String taskId = StringUtils.format("%s_%s_%s", TASK_PREFIX, dataSourceName, DateTimes.nowUtc());
// generate parser
Map<String, Object> parseSpec = Maps.newHashMap();
parseSpec.put("format", "timeAndDims");
parseSpec.put("dimensionsSpec", dimensionsSpec);
Map<String, Object> parser = Maps.newHashMap();
parser.put("type", "map");
parser.put("parseSpec", parseSpec);
//generate HadoopTuningConfig
HadoopTuningConfig tuningConfigForTask = new HadoopTuningConfig(
tuningConfig.getWorkingPath(),
version,
tuningConfig.getPartitionsSpec(),
tuningConfig.getShardSpecs(),
tuningConfig.getIndexSpec(),
tuningConfig.getRowFlushBoundary(),
tuningConfig.getMaxBytesInMemory(),
tuningConfig.isLeaveIntermediate(),
tuningConfig.isCleanupOnFailure(),
tuningConfig.isOverwriteFiles(),
tuningConfig.isIgnoreInvalidRows(),
tuningConfig.getJobProperties(),
tuningConfig.isCombineText(),
tuningConfig.getUseCombiner(),
tuningConfig.getRowFlushBoundary(),
tuningConfig.getBuildV9Directly(),
tuningConfig.getNumBackgroundPersistThreads(),
tuningConfig.isForceExtendableShardSpecs(),
true,
tuningConfig.getUserAllowedHadoopPrefix(),
tuningConfig.isLogParseExceptions(),
tuningConfig.getMaxParseExceptions()
);
// generate granularity
ArbitraryGranularitySpec granularitySpec = new ArbitraryGranularitySpec(
Granularities.NONE,
ImmutableList.of(interval)
);
// generate DataSchema
DataSchema dataSchema = new DataSchema(
dataSourceName,
parser,
aggregators,
granularitySpec,
TransformSpec.NONE,
objectMapper
);
// generate DatasourceIngestionSpec
DatasourceIngestionSpec datasourceIngestionSpec = new DatasourceIngestionSpec(
baseDataSource,
null,
ImmutableList.of(interval),
segments,
null,
null,
null,
false,
null
);
// generate HadoopIOConfig
Map<String, Object> inputSpec = Maps.newHashMap();
inputSpec.put("type", "dataSource");
inputSpec.put("ingestionSpec", datasourceIngestionSpec);
HadoopIOConfig hadoopIOConfig = new HadoopIOConfig(inputSpec, null, null);
// generate HadoopIngestionSpec
HadoopIngestionSpec spec = new HadoopIngestionSpec(dataSchema, hadoopIOConfig, tuningConfigForTask);
// generate HadoopIndexTask
HadoopIndexTask task = new HadoopIndexTask(
taskId,
spec,
hadoopCoordinates,
hadoopDependencyCoordinates,
classpathPrefix,
objectMapper,
context,
authorizerMapper,
chatHandlerProvider
);
return task;
}
public Set<String> getDimensions()
{
return dimensions;
}
public Set<String> getMetrics()
{
return metrics;
}
@JsonProperty("baseDataSource")
public String getBaseDataSource()
{
return baseDataSource;
}
@JsonProperty("dimensionsSpec")
public DimensionsSpec getDimensionsSpec()
{
return dimensionsSpec;
}
@JsonProperty("metricsSpec")
public AggregatorFactory[] getMetricsSpec()
{
return aggregators;
}
@JsonProperty("tuningConfig")
public HadoopTuningConfig getTuningConfig()
{
return tuningConfig;
}
@JsonProperty("dataSource")
public String getDataSourceName()
{
return dataSourceName;
}
@JsonProperty("hadoopCoordinates")
public String getHadoopCoordinates()
{
return hadoopCoordinates;
}
@JsonProperty("hadoopDependencyCoordinates")
public List<String> getSadoopDependencyCoordinates()
{
return hadoopDependencyCoordinates;
}
@JsonProperty("classpathPrefix")
public String getClasspathPrefix()
{
return classpathPrefix;
}
@JsonProperty("context")
public Map<String, Object> getContext()
{
return context;
}
@Override
public String getId()
{
return StringUtils.format("MaterializedViewSupervisor-%s", dataSourceName);
}
@Override
public Supervisor createSupervisor()
{
return new MaterializedViewSupervisor(
taskMaster,
taskStorage,
metadataSupervisorManager,
segmentManager,
metadataStorageCoordinator,
config,
this
);
}
@Override
public List<String> getDataSources()
{
return ImmutableList.of(dataSourceName);
}
@Override
public String toString()
{
return "MaterializedViewSupervisorSpec{" +
"baseDataSource=" + baseDataSource +
", dimensions=" + dimensions +
", metrics=" + metrics +
'}';
}
}

View File

@ -0,0 +1,34 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.indexing.materializedview;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.joda.time.Period;
public class MaterializedViewTaskConfig
{
@JsonProperty
private Period taskCheckDuration = new Period("PT1M");
public Period getTaskCheckDuration()
{
return taskCheckDuration;
}
}

View File

@ -0,0 +1 @@
io.druid.indexing.materializedview.MaterializedViewMaintenanceDruidModule

View File

@ -0,0 +1,146 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.indexing.materializedview;
import com.fasterxml.jackson.databind.InjectableValues;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.google.common.collect.Lists;
import io.druid.data.input.impl.DimensionsSpec;
import io.druid.data.input.impl.StringDimensionSchema;
import io.druid.indexer.HadoopTuningConfig;
import io.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
import io.druid.indexing.overlord.TaskMaster;
import io.druid.indexing.overlord.TaskStorage;
import io.druid.math.expr.ExprMacroTable;
import io.druid.metadata.MetadataSupervisorManager;
import io.druid.metadata.SQLMetadataSegmentManager;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory;
import io.druid.query.aggregation.LongSumAggregatorFactory;
import io.druid.query.expression.LookupEnabledTestExprMacroTable;
import io.druid.segment.TestHelper;
import io.druid.segment.realtime.firehose.ChatHandlerProvider;
import io.druid.segment.realtime.firehose.NoopChatHandlerProvider;
import io.druid.server.security.AuthorizerMapper;
import static org.easymock.EasyMock.createMock;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
public class MaterializedViewSupervisorSpecTest
{
private ObjectMapper objectMapper = TestHelper.makeJsonMapper();
@Before
public void setup()
{
objectMapper.registerSubtypes(new NamedType(MaterializedViewSupervisorSpec.class, "derivativeDataSource"));
objectMapper.setInjectableValues(
new InjectableValues.Std()
.addValue(TaskMaster.class, null)
.addValue(TaskStorage.class, null)
.addValue(ExprMacroTable.class.getName(), LookupEnabledTestExprMacroTable.INSTANCE)
.addValue(ObjectMapper.class, objectMapper)
.addValue(MetadataSupervisorManager.class, null)
.addValue(SQLMetadataSegmentManager.class, null)
.addValue(IndexerMetadataStorageCoordinator.class, null)
.addValue(MaterializedViewTaskConfig.class, new MaterializedViewTaskConfig())
.addValue(AuthorizerMapper.class, createMock(AuthorizerMapper.class))
.addValue(ChatHandlerProvider.class, new NoopChatHandlerProvider())
);
}
@Test
public void testSupervisorSerialization() throws IOException
{
String supervisorStr = "{\n" +
" \"type\" : \"derivativeDataSource\",\n" +
" \"baseDataSource\": \"wikiticker\",\n" +
" \"dimensionsSpec\":{\n" +
" \"dimensions\" : [\n" +
" \"isUnpatrolled\",\n" +
" \"metroCode\",\n" +
" \"namespace\",\n" +
" \"page\",\n" +
" \"regionIsoCode\",\n" +
" \"regionName\",\n" +
" \"user\"\n" +
" ]\n" +
" },\n" +
" \"metricsSpec\" : [\n" +
" {\n" +
" \"name\" : \"count\",\n" +
" \"type\" : \"count\"\n" +
" },\n" +
" {\n" +
" \"name\" : \"added\",\n" +
" \"type\" : \"longSum\",\n" +
" \"fieldName\" : \"added\"\n" +
" }\n" +
" ],\n" +
" \"tuningConfig\": {\n" +
" \"type\" : \"hadoop\"\n" +
" }\n" +
"}";
MaterializedViewSupervisorSpec expected = new MaterializedViewSupervisorSpec(
"wikiticker",
new DimensionsSpec(
Lists.newArrayList(
new StringDimensionSchema("isUnpatrolled"),
new StringDimensionSchema("metroCode"),
new StringDimensionSchema("namespace"),
new StringDimensionSchema("page"),
new StringDimensionSchema("regionIsoCode"),
new StringDimensionSchema("regionName"),
new StringDimensionSchema("user")
),
null,
null
),
new AggregatorFactory[]{
new CountAggregatorFactory("count"),
new LongSumAggregatorFactory("added", "added")
},
HadoopTuningConfig.makeDefaultTuningConfig(),
null,
null,
null,
null,
null,
objectMapper,
null,
null,
null,
null,
null,
new MaterializedViewTaskConfig(),
createMock(AuthorizerMapper.class),
new NoopChatHandlerProvider()
);
MaterializedViewSupervisorSpec spec = objectMapper.readValue(supervisorStr, MaterializedViewSupervisorSpec.class);
Assert.assertEquals(expected.getBaseDataSource(), spec.getBaseDataSource());
Assert.assertEquals(expected.getId(), spec.getId());
Assert.assertEquals(expected.getDataSourceName(), spec.getDataSourceName());
Assert.assertEquals(expected.getDimensions(), spec.getDimensions());
Assert.assertEquals(expected.getMetrics(), spec.getMetrics());
}
}

View File

@ -0,0 +1,178 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.indexing.materializedview;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import io.druid.data.input.impl.DimensionsSpec;
import io.druid.data.input.impl.StringDimensionSchema;
import io.druid.indexer.HadoopTuningConfig;
import io.druid.indexing.common.task.Task;
import io.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
import io.druid.indexing.overlord.TaskMaster;
import io.druid.indexing.overlord.TaskQueue;
import io.druid.indexing.overlord.TaskRunner;
import io.druid.indexing.overlord.TaskStorage;
import io.druid.java.util.common.Intervals;
import io.druid.java.util.common.Pair;
import io.druid.metadata.IndexerSQLMetadataStorageCoordinator;
import io.druid.metadata.MetadataSupervisorManager;
import io.druid.metadata.SQLMetadataSegmentManager;
import io.druid.metadata.TestDerbyConnector;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.LongSumAggregatorFactory;
import io.druid.segment.TestHelper;
import io.druid.segment.realtime.firehose.ChatHandlerProvider;
import io.druid.server.security.AuthorizerMapper;
import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.HashBasedNumberedShardSpec;
import static org.easymock.EasyMock.expect;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedMap;
import static org.easymock.EasyMock.createMock;
import org.junit.rules.ExpectedException;
public class MaterializedViewSupervisorTest
{
@Rule
public final TestDerbyConnector.DerbyConnectorRule derbyConnectorRule = new TestDerbyConnector.DerbyConnectorRule();
@Rule
public final ExpectedException expectedException = ExpectedException.none();
private TestDerbyConnector derbyConnector;
private TaskStorage taskStorage;
private TaskMaster taskMaster;
private IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator;
private MetadataSupervisorManager metadataSupervisorManager;
private SQLMetadataSegmentManager sqlMetadataSegmentManager;
private TaskQueue taskQueue;
private MaterializedViewSupervisor supervisor;
private MaterializedViewSupervisorSpec spec;
private ObjectMapper objectMapper = TestHelper.makeJsonMapper();
@Before
public void setUp() throws IOException
{
derbyConnector = derbyConnectorRule.getConnector();
derbyConnector.createDataSourceTable();
derbyConnector.createSegmentTable();
taskStorage = createMock(TaskStorage.class);
taskMaster = createMock(TaskMaster.class);
indexerMetadataStorageCoordinator = new IndexerSQLMetadataStorageCoordinator(
objectMapper,
derbyConnectorRule.metadataTablesConfigSupplier().get(),
derbyConnector
);
metadataSupervisorManager = createMock(MetadataSupervisorManager.class);
sqlMetadataSegmentManager = createMock(SQLMetadataSegmentManager.class);
taskQueue = createMock(TaskQueue.class);
taskQueue.start();
objectMapper.registerSubtypes(new NamedType(HashBasedNumberedShardSpec.class, "hashed"));
spec = new MaterializedViewSupervisorSpec(
"base",
new DimensionsSpec(Lists.newArrayList(new StringDimensionSchema("dim")), null, null),
new AggregatorFactory[]{new LongSumAggregatorFactory("m1", "m1")},
HadoopTuningConfig.makeDefaultTuningConfig(),
null,
null,
null,
null,
null,
objectMapper,
taskMaster,
taskStorage,
metadataSupervisorManager,
sqlMetadataSegmentManager,
indexerMetadataStorageCoordinator,
new MaterializedViewTaskConfig(),
createMock(AuthorizerMapper.class),
createMock(ChatHandlerProvider.class)
);
supervisor = (MaterializedViewSupervisor) spec.createSupervisor();
}
@Test
public void testCheckSegments() throws IOException
{
Set<DataSegment> baseSegments = Sets.newHashSet(
new DataSegment(
"base",
Intervals.of("2015-01-01T00Z/2015-01-02T00Z"),
"2015-01-02",
ImmutableMap.<String, Object>of(),
ImmutableList.of("dim1", "dim2"),
ImmutableList.of("m1"),
new HashBasedNumberedShardSpec(0, 1, null, null),
9,
1024
),
new DataSegment(
"base",
Intervals.of("2015-01-02T00Z/2015-01-03T00Z"),
"2015-01-03",
ImmutableMap.<String, Object>of(),
ImmutableList.of("dim1", "dim2"),
ImmutableList.of("m1"),
new HashBasedNumberedShardSpec(0, 1, null, null),
9,
1024
)
);
indexerMetadataStorageCoordinator.announceHistoricalSegments(baseSegments);
expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
expect(taskMaster.getTaskRunner()).andReturn(Optional.<TaskRunner>absent()).anyTimes();
expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.<Task>of()).anyTimes();
Pair<SortedMap<Interval, String>, Map<Interval, List<DataSegment>>> toBuildInterval = supervisor.checkSegments();
Map<Interval, List<DataSegment>> expectedSegments = Maps.newHashMap();
expectedSegments.put(
Intervals.of("2015-01-01T00Z/2015-01-02T00Z"),
Lists.newArrayList(
new DataSegment(
"base",
Intervals.of("2015-01-01T00Z/2015-01-02T00Z"),
"2015-01-02",
ImmutableMap.<String, Object>of(),
ImmutableList.of("dim1", "dim2"),
ImmutableList.of("m1"),
new HashBasedNumberedShardSpec(0, 1, null, null),
9,
1024
)
)
);
Assert.assertEquals(expectedSegments, toBuildInterval.rhs);
}
}

View File

@ -0,0 +1,73 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to Metamarkets Group Inc. (Metamarkets) under one
~ or more contributor license agreements. See the NOTICE file
~ distributed with this work for additional information
~ regarding copyright ownership. Metamarkets licenses this file
~ to you under the Apache License, Version 2.0 (the
~ "License"); you may not use this file except in compliance
~ with the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing,
~ software distributed under the License is distributed on an
~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
~ KIND, either express or implied. See the License for the
~ specific language governing permissions and limitations
~ under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>druid</artifactId>
<groupId>io.druid</groupId>
<version>0.13.0-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
<groupId>io.druid.extensions.contrib</groupId>
<artifactId>materialized-view-selection</artifactId>
<name>materialized-view-selection</name>
<dependencies>
<dependency>
<groupId>io.druid</groupId>
<artifactId>druid-server</artifactId>
<version>${project.parent.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>io.druid.extensions.contrib</groupId>
<artifactId>materialized-view-maintenance</artifactId>
<version>${project.parent.version}</version>
</dependency>
<dependency>
<groupId>io.druid</groupId>
<artifactId>druid-processing</artifactId>
<version>${project.parent.version}</version>
<scope>test</scope>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>io.druid</groupId>
<artifactId>druid-server</artifactId>
<version>${project.parent.version}</version>
<scope>test</scope>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>org.easymock</groupId>
<artifactId>easymock</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,213 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.materializedview;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSortedSet;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.inject.Inject;
import io.druid.client.TimelineServerView;
import io.druid.query.Query;
import io.druid.query.TableDataSource;
import io.druid.query.groupby.GroupByQuery;
import io.druid.query.spec.MultipleIntervalSegmentSpec;
import io.druid.query.timeseries.TimeseriesQuery;
import io.druid.query.topn.TopNQuery;
import io.druid.timeline.TimelineObjectHolder;
import org.joda.time.Interval;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
public class DataSourceOptimizer
{
private final ReadWriteLock lock = new ReentrantReadWriteLock();
private final TimelineServerView serverView;
private ConcurrentHashMap<String, AtomicLong> derivativesHitCount = new ConcurrentHashMap<>();
private ConcurrentHashMap<String, AtomicLong> totalCount = new ConcurrentHashMap<>();
private ConcurrentHashMap<String, AtomicLong> hitCount = new ConcurrentHashMap<>();
private ConcurrentHashMap<String, AtomicLong> costTime = new ConcurrentHashMap<>();
private ConcurrentHashMap<String, ConcurrentHashMap<Set<String>, AtomicLong>> missFields = new ConcurrentHashMap<>();
@Inject
public DataSourceOptimizer(TimelineServerView serverView)
{
this.serverView = serverView;
}
/**
* Do main work about materialized view selection: transform user query to one or more sub-queries.
*
* In the sub-query, the dataSource is the derivative of dataSource in user query, and sum of all sub-queries'
* intervals equals the interval in user query
*
* Derived dataSource with smallest average data size per segment granularity have highest priority to replace the
* datasource in user query
*
* @param query only TopNQuery/TimeseriesQuery/GroupByQuery can be optimized
* @return a list of queries with specified derived dataSources and intervals
*/
public List<Query> optimize(Query query)
{
long start = System.currentTimeMillis();
// only topN/timeseries/groupby query can be optimized
// only TableDataSource can be optimiezed
if (!(query instanceof TopNQuery || query instanceof TimeseriesQuery || query instanceof GroupByQuery)
|| !(query.getDataSource() instanceof TableDataSource)) {
return Collections.singletonList(query);
}
String datasourceName = ((TableDataSource) query.getDataSource()).getName();
// get all derivatives for datasource in query. The derivatives set is sorted by average size of
// per segment granularity.
Set<DerivativeDataSource> derivatives = DerivativeDataSourceManager.getDerivatives(datasourceName);
if (derivatives.isEmpty()) {
return Lists.newArrayList(query);
}
lock.readLock().lock();
try {
totalCount.putIfAbsent(datasourceName, new AtomicLong(0));
hitCount.putIfAbsent(datasourceName, new AtomicLong(0));
costTime.putIfAbsent(datasourceName, new AtomicLong(0));
totalCount.get(datasourceName).incrementAndGet();
// get all fields which the query required
Set<String> requiredFields = MaterializedViewUtils.getRequiredFields(query);
Set<DerivativeDataSource> derivativesWithRequiredFields = Sets.newHashSet();
for (DerivativeDataSource derivativeDataSource : derivatives) {
derivativesHitCount.putIfAbsent(derivativeDataSource.getName(), new AtomicLong(0));
if (derivativeDataSource.getColumns().containsAll(requiredFields)) {
derivativesWithRequiredFields.add(derivativeDataSource);
}
}
// if no derivatives contains all required dimensions, this materialized view selection failed.
if (derivativesWithRequiredFields.isEmpty()) {
missFields.putIfAbsent(datasourceName, new ConcurrentHashMap<>());
missFields.get(datasourceName).putIfAbsent(requiredFields, new AtomicLong(0));
missFields.get(datasourceName).get(requiredFields).incrementAndGet();
costTime.get(datasourceName).addAndGet(System.currentTimeMillis() - start);
return Lists.newArrayList(query);
}
List<Query> queries = Lists.newArrayList();
List<Interval> remainingQueryIntervals = (List<Interval>) query.getIntervals();
for (DerivativeDataSource derivativeDataSource : ImmutableSortedSet.copyOf(derivativesWithRequiredFields)) {
final List<Interval> derivativeIntervals = remainingQueryIntervals.stream()
.flatMap(interval -> serverView
.getTimeline((new TableDataSource(derivativeDataSource.getName())))
.lookup(interval)
.stream()
.map(TimelineObjectHolder::getInterval)
)
.collect(Collectors.toList());
// if the derivative does not contain any parts of intervals in the query, the derivative will
// not be selected.
if (derivativeIntervals.isEmpty()) {
continue;
}
remainingQueryIntervals = MaterializedViewUtils.minus(remainingQueryIntervals, derivativeIntervals);
queries.add(
query.withDataSource(new TableDataSource(derivativeDataSource.getName()))
.withQuerySegmentSpec(new MultipleIntervalSegmentSpec(derivativeIntervals))
);
derivativesHitCount.get(derivativeDataSource.getName()).incrementAndGet();
if (remainingQueryIntervals.isEmpty()) {
break;
}
}
if (queries.isEmpty()) {
costTime.get(datasourceName).addAndGet(System.currentTimeMillis() - start);
return Lists.newArrayList(query);
}
//after materialized view selection, the result of the remaining query interval will be computed based on
// the original datasource.
if (!remainingQueryIntervals.isEmpty()) {
queries.add(query.withQuerySegmentSpec(new MultipleIntervalSegmentSpec(remainingQueryIntervals)));
}
hitCount.get(datasourceName).incrementAndGet();
costTime.get(datasourceName).addAndGet(System.currentTimeMillis() - start);
return queries;
}
finally {
lock.readLock().unlock();
}
}
public List<DataSourceOptimizerStats> getAndResetStats()
{
ImmutableMap<String, AtomicLong> derivativesHitCountSnapshot;
ImmutableMap<String, AtomicLong> totalCountSnapshot;
ImmutableMap<String, AtomicLong> hitCountSnapshot;
ImmutableMap<String, AtomicLong> costTimeSnapshot;
ImmutableMap<String, ConcurrentHashMap<Set<String>, AtomicLong>> missFieldsSnapshot;
lock.writeLock().lock();
try {
derivativesHitCountSnapshot = ImmutableMap.copyOf(derivativesHitCount);
totalCountSnapshot = ImmutableMap.copyOf(totalCount);
hitCountSnapshot = ImmutableMap.copyOf(hitCount);
costTimeSnapshot = ImmutableMap.copyOf(costTime);
missFieldsSnapshot = ImmutableMap.copyOf(missFields);
derivativesHitCount.clear();
totalCount.clear();
hitCount.clear();
costTime.clear();
missFields.clear();
}
finally {
lock.writeLock().unlock();
}
List<DataSourceOptimizerStats> stats = Lists.newArrayList();
Map<String, Set<DerivativeDataSource>> baseToDerivatives = DerivativeDataSourceManager.getAllDerivatives();
for (Map.Entry<String, Set<DerivativeDataSource>> entry : baseToDerivatives.entrySet()) {
Map<String, Long> derivativesStat = Maps.newHashMap();
for (DerivativeDataSource derivative : entry.getValue()) {
derivativesStat.put(
derivative.getName(),
derivativesHitCountSnapshot.getOrDefault(derivative.getName(), new AtomicLong(0)).get()
);
}
stats.add(
new DataSourceOptimizerStats(
entry.getKey(),
hitCountSnapshot.getOrDefault(entry.getKey(), new AtomicLong(0)).get(),
totalCountSnapshot.getOrDefault(entry.getKey(), new AtomicLong(0)).get(),
costTimeSnapshot.getOrDefault(entry.getKey(), new AtomicLong(0)).get(),
missFieldsSnapshot.getOrDefault(entry.getKey(), new ConcurrentHashMap<>()),
derivativesStat
)
);
}
return stats;
}
}

View File

@ -0,0 +1,66 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.materializedview;
import com.google.inject.Inject;
import io.druid.java.util.emitter.service.ServiceEmitter;
import io.druid.java.util.emitter.service.ServiceMetricEvent;
import io.druid.java.util.metrics.AbstractMonitor;
import java.util.List;
import java.util.Map;
import java.util.Set;
public class DataSourceOptimizerMonitor extends AbstractMonitor
{
private final DataSourceOptimizer optimizer;
@Inject
public DataSourceOptimizerMonitor(DataSourceOptimizer optimizer)
{
this.optimizer = optimizer;
}
@Override
public boolean doMonitor(ServiceEmitter emitter)
{
List<DataSourceOptimizerStats> stats = optimizer.getAndResetStats();
for (DataSourceOptimizerStats stat : stats) {
final ServiceMetricEvent.Builder builder = new ServiceMetricEvent.Builder();
builder.setDimension("dataSource", stat.getBase());
emitter.emit(builder.build("/materialized/view/query/totalNum", stat.getTotalcount()));
emitter.emit(builder.build("/materialized/view/query/hits", stat.getHitcount()));
emitter.emit(builder.build("/materialized/view/query/hitRate", stat.getHitRate()));
emitter.emit(builder.build("/materialized/view/select/avgCostMS", stat.getOptimizerCost()));
Map<String, Long> derivativesStats = stat.getDerivativesHitCount();
for (String derivative : derivativesStats.keySet()) {
builder.setDimension("derivative", derivative);
emitter.emit(builder.build("/materialized/view/derivative/numSelected", derivativesStats.get(derivative)));
}
final ServiceMetricEvent.Builder builder2 = new ServiceMetricEvent.Builder();
builder2.setDimension("dataSource", stat.getBase());
for (Set<String> fields : stat.getMissFields().keySet()) {
builder2.setDimension("fields", fields.toString());
emitter.emit(builder2.build("/materialized/view/missNum", stat.getMissFields().get(fields).get()));
}
}
return true;
}
}

View File

@ -0,0 +1,93 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.materializedview;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
public class DataSourceOptimizerStats
{
private final String base;
private final long hitcount;
private final long totalcount;
private final long optimizerCost;
private final Map<Set<String>, AtomicLong> missFields;
private final Map<String, Long> derivativesHitCount;
public DataSourceOptimizerStats(
String base,
long hitcount,
long totalcount,
long optimizerCost,
Map<Set<String>, AtomicLong> missFields,
Map<String, Long> derivativesHitCount
)
{
this.base = base;
this.hitcount = hitcount;
this.totalcount = totalcount;
this.optimizerCost = optimizerCost;
this.missFields = missFields;
this.derivativesHitCount = derivativesHitCount;
}
public Map<Set<String>, AtomicLong> getMissFields()
{
return missFields;
}
public String getBase()
{
return base;
}
public long getHitcount()
{
return hitcount;
}
public long getTotalcount()
{
return totalcount;
}
public double getOptimizerCost()
{
if (totalcount == 0L) {
return 0;
}
return ((double) optimizerCost) / totalcount;
}
public double getHitRate()
{
if (totalcount == 0L) {
return 0;
}
return ((double) hitcount) / totalcount;
}
public Map<String, Long> getDerivativesHitCount()
{
return derivativesHitCount;
}
}

View File

@ -0,0 +1,94 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.materializedview;
import com.google.common.base.Preconditions;
import java.util.Objects;
import java.util.Set;
public class DerivativeDataSource implements Comparable<DerivativeDataSource>
{
private final String name;
private final String baseDataSource;
private final Set<String> columns;
private final long avgSizeBasedGranularity;
public DerivativeDataSource(String name, String baseDataSource, Set<String> columns, long size)
{
this.name = Preconditions.checkNotNull(name, "name");
this.baseDataSource = Preconditions.checkNotNull(baseDataSource, "baseDataSource");
this.columns = Preconditions.checkNotNull(columns, "columns");
this.avgSizeBasedGranularity = size;
}
public String getName()
{
return name;
}
public String getBaseDataSource()
{
return baseDataSource;
}
public Set<String> getColumns()
{
return columns;
}
public long getAvgSizeBasedGranularity()
{
return avgSizeBasedGranularity;
}
@Override
public int compareTo(DerivativeDataSource o)
{
if (this.avgSizeBasedGranularity > o.getAvgSizeBasedGranularity()) {
return 1;
} else if (this.avgSizeBasedGranularity == o.getAvgSizeBasedGranularity()) {
return 0;
} else {
return -1;
}
}
@Override
public boolean equals(Object o)
{
if (o == null) {
return false;
}
if (!(o instanceof DerivativeDataSource)) {
return false;
}
DerivativeDataSource that = (DerivativeDataSource) o;
return name.equals(that.getName())
&& baseDataSource.equals(that.getBaseDataSource())
&& columns.equals(that.getColumns());
}
@Override
public int hashCode()
{
return Objects.hash(name, baseDataSource, columns);
}
}

View File

@ -0,0 +1,265 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.materializedview;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningScheduledExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.inject.Inject;
import io.druid.guice.ManageLifecycleLast;
import io.druid.indexing.overlord.DataSourceMetadata;
import io.druid.metadata.MetadataStorageTablesConfig;
import io.druid.metadata.SQLMetadataConnector;
import io.druid.indexing.materializedview.DerivativeDataSourceMetadata;
import io.druid.java.util.common.DateTimes;
import io.druid.java.util.common.Intervals;
import io.druid.java.util.common.Pair;
import io.druid.java.util.common.StringUtils;
import io.druid.java.util.common.concurrent.Execs;
import io.druid.java.util.common.lifecycle.LifecycleStart;
import io.druid.java.util.common.lifecycle.LifecycleStop;
import io.druid.java.util.emitter.EmittingLogger;
import io.druid.timeline.DataSegment;
import org.joda.time.Duration;
import org.joda.time.Interval;
import org.skife.jdbi.v2.Handle;
import org.skife.jdbi.v2.StatementContext;
import org.skife.jdbi.v2.tweak.HandleCallback;
import org.skife.jdbi.v2.tweak.ResultSetMapper;
import java.io.IOException;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.List;
import java.util.Set;
import java.util.SortedSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
/**
* Read and store derivatives information from dataSource table frequently.
* When optimize query, DerivativesManager offers the information about derivatives.
*/
@ManageLifecycleLast
public class DerivativeDataSourceManager
{
private static final EmittingLogger log = new EmittingLogger(DerivativeDataSourceManager.class);
private static final AtomicReference<ConcurrentHashMap<String, SortedSet<DerivativeDataSource>>> derivativesRef =
new AtomicReference<>(new ConcurrentHashMap<>());
private final MaterializedViewConfig config;
private final Supplier<MetadataStorageTablesConfig> dbTables;
private final SQLMetadataConnector connector;
private final ObjectMapper objectMapper;
private final Object lock = new Object();
private boolean started = false;
private ListeningScheduledExecutorService exec = null;
private ListenableFuture<?> future = null;
@Inject
public DerivativeDataSourceManager(
MaterializedViewConfig config,
Supplier<MetadataStorageTablesConfig> dbTables,
ObjectMapper objectMapper,
SQLMetadataConnector connector
)
{
this.config = config;
this.dbTables = dbTables;
this.objectMapper = objectMapper;
this.connector = connector;
}
@LifecycleStart
public void start()
{
log.info("starting derivatives manager.");
synchronized (lock) {
if (started) {
return;
}
exec = MoreExecutors.listeningDecorator(Execs.scheduledSingleThreaded("DerivativeDataSourceManager-Exec-%d"));
final Duration delay = config.getPollDuration().toStandardDuration();
future = exec.scheduleWithFixedDelay(
new Runnable() {
@Override
public void run()
{
try {
updateDerivatives();
}
catch (Exception e) {
log.makeAlert(e, "uncaught exception in derivatives manager updating thread").emit();
}
}
},
0,
delay.getMillis(),
TimeUnit.MILLISECONDS
);
started = true;
}
log.info("Derivatives manager started.");
}
@LifecycleStop
public void stop()
{
synchronized (lock) {
if (!started) {
return;
}
started = false;
future.cancel(true);
future = null;
derivativesRef.set(new ConcurrentHashMap<>());
exec.shutdownNow();
exec = null;
}
}
public static ImmutableSet<DerivativeDataSource> getDerivatives(String datasource)
{
return ImmutableSet.copyOf(derivativesRef.get().getOrDefault(datasource, Sets.newTreeSet()));
}
public static ImmutableMap<String, Set<DerivativeDataSource>> getAllDerivatives()
{
return ImmutableMap.copyOf(derivativesRef.get());
}
private void updateDerivatives()
{
List<Pair<String, DerivativeDataSourceMetadata>> derivativesInDatabase = connector.retryWithHandle(
handle ->
handle.createQuery(
StringUtils.format("SELECT DISTINCT dataSource,commit_metadata_payload FROM %1$s", dbTables.get().getDataSourceTable())
)
.map(new ResultSetMapper<Pair<String, DerivativeDataSourceMetadata>>()
{
@Override
public Pair<String, DerivativeDataSourceMetadata> map(int index, ResultSet r, StatementContext ctx) throws SQLException
{
String datasourceName = r.getString("dataSource");
try {
DataSourceMetadata payload = objectMapper.readValue(
r.getBytes("commit_metadata_payload"),
DataSourceMetadata.class);
if (!(payload instanceof DerivativeDataSourceMetadata)) {
return null;
}
DerivativeDataSourceMetadata metadata = (DerivativeDataSourceMetadata) payload;
return new Pair<>(datasourceName, metadata);
}
catch (IOException e) {
throw new RuntimeException(e);
}
}
})
.list()
);
List<DerivativeDataSource> derivativeDataSources = derivativesInDatabase.parallelStream()
.filter(data -> data != null)
.map(derivatives -> {
String name = derivatives.lhs;
DerivativeDataSourceMetadata metadata = derivatives.rhs;
String baseDataSource = metadata.getBaseDataSource();
long avgSizePerGranularity = getAvgSizePerGranularity(name);
log.info("find derivatives: {bases=%s, derivative=%s, dimensions=%s, metrics=%s, avgSize=%s}",
baseDataSource, name, metadata.getDimensions(), metadata.getMetrics(), avgSizePerGranularity);
return new DerivativeDataSource(name, baseDataSource, metadata.getColumns(), avgSizePerGranularity);
})
.filter(derivatives -> derivatives.getAvgSizeBasedGranularity() > 0)
.collect(Collectors.toList());
ConcurrentHashMap<String, SortedSet<DerivativeDataSource>> newDerivatives = new ConcurrentHashMap<>();
for (DerivativeDataSource derivative : derivativeDataSources) {
newDerivatives.putIfAbsent(derivative.getBaseDataSource(), Sets.newTreeSet());
newDerivatives.get(derivative.getBaseDataSource()).add(derivative);
}
ConcurrentHashMap<String, SortedSet<DerivativeDataSource>> current;
do {
current = derivativesRef.get();
} while (!derivativesRef.compareAndSet(current, newDerivatives));
}
/**
* caculate the average data size per segment granularity for a given datasource.
*
* e.g. for a datasource, there're 5 segments as follows,
* interval = "2018-04-01/2017-04-02", segment size = 1024 * 1024 * 2
* interval = "2018-04-01/2017-04-02", segment size = 1024 * 1024 * 2
* interval = "2018-04-02/2017-04-03", segment size = 1024 * 1024 * 1
* interval = "2018-04-02/2017-04-03", segment size = 1024 * 1024 * 1
* interval = "2018-04-02/2017-04-03", segment size = 1024 * 1024 * 1
* Then, we get interval number = 2, total segment size = 1024 * 1024 * 7
* At last, return the result 1024 * 1024 * 7 / 2 = 1024 * 1024 * 3.5
*
* @param datasource
* @return average data size per segment granularity
*/
private long getAvgSizePerGranularity(String datasource)
{
return connector.retryWithHandle(
new HandleCallback<Long>() {
Set<Interval> intervals = Sets.newHashSet();
long totalSize = 0;
@Override
public Long withHandle(Handle handle)
{
handle.createQuery(
StringUtils.format("SELECT start,%1$send%1$s,payload FROM %2$s WHERE used = true AND dataSource = :dataSource",
connector.getQuoteString(), dbTables.get().getSegmentsTable()
)
)
.bind("dataSource", datasource)
.map(
new ResultSetMapper<Object>()
{
@Override
public Object map(int index, ResultSet r, StatementContext ctx) throws SQLException
{
try {
intervals.add(Intervals.utc(DateTimes.of(r.getString("start")).getMillis(), DateTimes.of(r.getString("end")).getMillis()));
DataSegment segment = objectMapper.readValue(r.getBytes("payload"), DataSegment.class);
totalSize += segment.getSize();
}
catch (IOException e) {
throw new RuntimeException(e);
}
return null;
}
}
)
.first();
return intervals.isEmpty() ? 0L : totalSize / intervals.size();
}
}
);
}
}

View File

@ -0,0 +1,35 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.materializedview;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.joda.time.Period;
public class MaterializedViewConfig
{
@JsonProperty
private Period pollDuration = new Period("PT1M");
public Period getPollDuration()
{
return pollDuration;
}
}

View File

@ -0,0 +1,234 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.materializedview;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.collect.Ordering;
import io.druid.java.util.common.granularity.Granularity;
import io.druid.query.BaseQuery;
import io.druid.query.DataSource;
import io.druid.query.Query;
import io.druid.query.QueryRunner;
import io.druid.query.QuerySegmentWalker;
import io.druid.query.filter.DimFilter;
import io.druid.query.groupby.GroupByQuery;
import io.druid.query.spec.QuerySegmentSpec;
import io.druid.query.timeseries.TimeseriesQuery;
import io.druid.query.topn.TopNQuery;
import org.joda.time.DateTimeZone;
import org.joda.time.Duration;
import org.joda.time.Interval;
import java.util.List;
import java.util.Map;
import java.util.Objects;
/**
* MaterializedViewQuery helps to do materialized view selection automatically.
*
* Each MaterializedViewQuery contains a real query which type can be topn, timeseries or groupBy.
* The real query will be optimized based on its dataSources and intervals. It will be converted into one or more
* sub-queries, in which dataSources and intervals are replaced by derived dataSources and related sub-intervals.
*
* Derived dataSources always have less dimensions, but contains all dimensions which real query required.
*/
public class MaterializedViewQuery<T> implements Query<T>
{
public static final String TYPE = "view";
private final Query query;
private final DataSourceOptimizer optimizer;
@JsonCreator
public MaterializedViewQuery(
@JsonProperty("query") Query query,
@JacksonInject DataSourceOptimizer optimizer
)
{
Preconditions.checkArgument(
query instanceof TopNQuery || query instanceof TimeseriesQuery || query instanceof GroupByQuery,
"Only topN/timeseries/groupby query are supported"
);
this.query = query;
this.optimizer = optimizer;
}
@JsonProperty("query")
public Query getQuery()
{
return query;
}
public DataSourceOptimizer getOptimizer()
{
return optimizer;
}
@Override
public DataSource getDataSource()
{
return query.getDataSource();
}
@Override
public boolean hasFilters()
{
return query.hasFilters();
}
@Override
public DimFilter getFilter()
{
return query.getFilter();
}
@Override
public String getType()
{
return query.getType();
}
@Override
public QueryRunner<T> getRunner(QuerySegmentWalker walker)
{
return ((BaseQuery) query).getQuerySegmentSpec().lookup(this, walker);
}
@Override
public List<Interval> getIntervals()
{
return query.getIntervals();
}
@Override
public Duration getDuration()
{
return query.getDuration();
}
@Override
public Granularity getGranularity()
{
return query.getGranularity();
}
@Override
public DateTimeZone getTimezone()
{
return query.getTimezone();
}
@Override
public Map<String, Object> getContext()
{
return query.getContext();
}
@Override
public <ContextType> ContextType getContextValue(String key)
{
return (ContextType) query.getContextValue(key);
}
@Override
public <ContextType> ContextType getContextValue(String key, ContextType defaultValue)
{
return (ContextType) query.getContextValue(key, defaultValue);
}
@Override
public boolean getContextBoolean(String key, boolean defaultValue)
{
return query.getContextBoolean(key, defaultValue);
}
@Override
public boolean isDescending()
{
return query.isDescending();
}
@Override
public Ordering<T> getResultOrdering()
{
return query.getResultOrdering();
}
@Override
public MaterializedViewQuery withOverriddenContext(Map<String, Object> contextOverride)
{
return new MaterializedViewQuery(query.withOverriddenContext(contextOverride), optimizer);
}
@Override
public MaterializedViewQuery withQuerySegmentSpec(QuerySegmentSpec spec)
{
return new MaterializedViewQuery(query.withQuerySegmentSpec(spec), optimizer);
}
@Override
public MaterializedViewQuery withId(String id)
{
return new MaterializedViewQuery(query.withId(id), optimizer);
}
@Override
public String getId()
{
return query.getId();
}
@Override
public MaterializedViewQuery withDataSource(DataSource dataSource)
{
return new MaterializedViewQuery(query.withDataSource(dataSource), optimizer);
}
@Override
public String toString()
{
return "MaterializedViewQuery{" +
"query=" + query.toString() +
"}";
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
MaterializedViewQuery other = (MaterializedViewQuery) o;
return other.getQuery().equals(query);
}
@Override
public int hashCode()
{
return Objects.hash(TYPE, query);
}
}

View File

@ -0,0 +1,107 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.materializedview;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.base.Function;
import com.google.inject.Inject;
import io.druid.java.util.common.guava.Sequence;
import io.druid.query.Query;
import io.druid.query.QueryMetrics;
import io.druid.query.QueryPlus;
import io.druid.query.QueryRunner;
import io.druid.query.QueryToolChest;
import io.druid.query.QueryToolChestWarehouse;
import io.druid.query.aggregation.MetricManipulationFn;
import java.util.Map;
public class MaterializedViewQueryQueryToolChest extends QueryToolChest
{
private final QueryToolChestWarehouse warehouse;
private DataSourceOptimizer optimizer;
@Inject
public MaterializedViewQueryQueryToolChest(
QueryToolChestWarehouse warehouse
)
{
this.warehouse = warehouse;
}
@Override
public QueryRunner mergeResults(QueryRunner runner)
{
return new QueryRunner() {
@Override
public Sequence run(QueryPlus queryPlus, Map responseContext)
{
Query realQuery = getRealQuery(queryPlus.getQuery());
return warehouse.getToolChest(realQuery).mergeResults(runner).run(queryPlus.withQuery(realQuery), responseContext);
}
};
}
@Override
public QueryMetrics makeMetrics(Query query)
{
Query realQuery = getRealQuery(query);
return warehouse.getToolChest(realQuery).makeMetrics(realQuery);
}
@Override
public Function makePreComputeManipulatorFn(Query query, MetricManipulationFn fn)
{
Query realQuery = getRealQuery(query);
return warehouse.getToolChest(realQuery).makePreComputeManipulatorFn(realQuery, fn);
}
@Override
public TypeReference getResultTypeReference()
{
return null;
}
@Override
public QueryRunner preMergeQueryDecoration(final QueryRunner runner)
{
return new QueryRunner() {
@Override
public Sequence run(QueryPlus queryPlus, Map responseContext)
{
Query realQuery = getRealQuery(queryPlus.getQuery());
QueryToolChest realQueryToolChest = warehouse.getToolChest(realQuery);
QueryRunner realQueryRunner = realQueryToolChest.preMergeQueryDecoration(
new MaterializedViewQueryRunner(runner, optimizer)
);
return realQueryRunner.run(queryPlus.withQuery(realQuery), responseContext);
}
};
}
public Query getRealQuery(Query query)
{
if (query instanceof MaterializedViewQuery) {
optimizer = ((MaterializedViewQuery) query).getOptimizer();
return ((MaterializedViewQuery) query).getQuery();
}
return query;
}
}

View File

@ -0,0 +1,69 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.materializedview;
import com.google.common.base.Function;
import com.google.common.collect.Lists;
import io.druid.java.util.common.guava.MergeSequence;
import io.druid.java.util.common.guava.Sequence;
import io.druid.java.util.common.guava.Sequences;
import io.druid.query.Query;
import io.druid.query.QueryPlus;
import io.druid.query.QueryRunner;
import java.util.Map;
public class MaterializedViewQueryRunner<T> implements QueryRunner<T>
{
private final QueryRunner runner;
private final DataSourceOptimizer optimizer;
public MaterializedViewQueryRunner(QueryRunner queryRunner, DataSourceOptimizer optimizer)
{
this.runner = queryRunner;
this.optimizer = optimizer;
}
@Override
public Sequence<T> run(QueryPlus<T> queryPlus, Map<String, Object> responseContext)
{
Query query = queryPlus.getQuery();
return new MergeSequence<>(
query.getResultOrdering(),
Sequences.simple(
Lists.transform(
optimizer.optimize(query),
new Function<Query, Sequence<T>>()
{
@Override
public Sequence<T> apply(Query query)
{
return runner.run(
queryPlus.withQuery(query),
responseContext
);
}
}
)
)
);
}
}

View File

@ -0,0 +1,59 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.materializedview;
import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.google.common.collect.ImmutableList;
import com.google.inject.Binder;
import com.google.inject.Singleton;
import io.druid.guice.DruidBinders;
import io.druid.guice.JsonConfigProvider;
import io.druid.guice.LifecycleModule;
import io.druid.initialization.DruidModule;
import io.druid.server.metrics.MetricsModule;
import java.util.List;
public class MaterializedViewSelectionDruidModule implements DruidModule
{
@Override
public List<? extends Module> getJacksonModules()
{
return ImmutableList.of(
new SimpleModule(getClass().getSimpleName())
.registerSubtypes(
new NamedType(MaterializedViewQuery.class, MaterializedViewQuery.TYPE))
);
}
@Override
public void configure(Binder binder)
{
DruidBinders.queryToolChestBinder(binder)
.addBinding(MaterializedViewQuery.class)
.to(MaterializedViewQueryQueryToolChest.class);
LifecycleModule.register(binder, DerivativeDataSourceManager.class);
binder.bind(DataSourceOptimizer.class).in(Singleton.class);
MetricsModule.register(binder, DataSourceOptimizerMonitor.class);
JsonConfigProvider.bind(binder, "druid.manager.derivatives", MaterializedViewConfig.class);
}
}

View File

@ -0,0 +1,292 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.materializedview;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import io.druid.java.util.common.Intervals;
import io.druid.java.util.common.JodaUtils;
import io.druid.query.Query;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.FilteredAggregatorFactory;
import io.druid.query.dimension.DimensionSpec;
import io.druid.query.filter.AndDimFilter;
import io.druid.query.filter.BoundDimFilter;
import io.druid.query.filter.DimFilter;
import io.druid.query.filter.InDimFilter;
import io.druid.query.filter.IntervalDimFilter;
import io.druid.query.filter.LikeDimFilter;
import io.druid.query.filter.NotDimFilter;
import io.druid.query.filter.OrDimFilter;
import io.druid.query.filter.RegexDimFilter;
import io.druid.query.filter.SearchQueryDimFilter;
import io.druid.query.filter.SelectorDimFilter;
import io.druid.query.groupby.GroupByQuery;
import io.druid.query.timeseries.TimeseriesQuery;
import io.druid.query.topn.TopNQuery;
import org.joda.time.Interval;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
public class MaterializedViewUtils
{
/**
* extract all dimensions in query.
* only support TopNQuery/TimeseriesQuery/GroupByQuery
*
* @param query
* @return dimensions set in query
*/
public static Set<String> getRequiredFields(Query query)
{
Set<String> dimensions = new HashSet<>();
Set<String> dimsInFilter = getDimensionsInFilter(query.getFilter());
if (dimsInFilter != null) {
dimensions.addAll(dimsInFilter);
}
if (query instanceof TopNQuery) {
TopNQuery q = (TopNQuery) query;
dimensions.addAll(extractFieldsFromAggregations(q.getAggregatorSpecs()));
dimensions.add(q.getDimensionSpec().getDimension());
} else if (query instanceof TimeseriesQuery) {
TimeseriesQuery q = (TimeseriesQuery) query;
dimensions.addAll(extractFieldsFromAggregations(q.getAggregatorSpecs()));
} else if (query instanceof GroupByQuery) {
GroupByQuery q = (GroupByQuery) query;
dimensions.addAll(extractFieldsFromAggregations(q.getAggregatorSpecs()));
for (DimensionSpec spec : q.getDimensions()) {
String dim = spec.getDimension();
dimensions.add(dim);
}
} else {
throw new UnsupportedOperationException("Method getRequeiredFields only support TopNQuery/TimeseriesQuery/GroupByQuery");
}
return dimensions;
}
private static Set<String> extractFieldsFromAggregations(List<AggregatorFactory> aggs)
{
Set<String> ret = new HashSet<>();
for (AggregatorFactory agg : aggs) {
if (agg instanceof FilteredAggregatorFactory) {
FilteredAggregatorFactory fagg = (FilteredAggregatorFactory) agg;
ret.addAll(getDimensionsInFilter(fagg.getFilter()));
}
ret.addAll(agg.requiredFields());
}
return ret;
}
private static Set<String> getDimensionsInFilter(DimFilter dimFilter)
{
if (dimFilter instanceof AndDimFilter) {
AndDimFilter d = (AndDimFilter) dimFilter;
Set<String> ret = new HashSet<>();
for (DimFilter filter : d.getFields()) {
ret.addAll(getDimensionsInFilter(filter));
}
return ret;
} else if (dimFilter instanceof OrDimFilter) {
OrDimFilter d = (OrDimFilter) dimFilter;
Set<String> ret = new HashSet<>();
for (DimFilter filter : d.getFields()) {
ret.addAll(getDimensionsInFilter(filter));
}
return ret;
} else if (dimFilter instanceof NotDimFilter) {
NotDimFilter d = (NotDimFilter) dimFilter;
return getDimensionsInFilter(d.getField());
} else if (dimFilter instanceof BoundDimFilter) {
BoundDimFilter d = (BoundDimFilter) dimFilter;
return Sets.newHashSet(d.getDimension());
} else if (dimFilter instanceof InDimFilter) {
InDimFilter d = (InDimFilter) dimFilter;
return Sets.newHashSet(d.getDimension());
} else if (dimFilter instanceof IntervalDimFilter) {
IntervalDimFilter d = (IntervalDimFilter) dimFilter;
return Sets.newHashSet(d.getDimension());
} else if (dimFilter instanceof LikeDimFilter) {
LikeDimFilter d = (LikeDimFilter) dimFilter;
return Sets.newHashSet(d.getDimension());
} else if (dimFilter instanceof RegexDimFilter) {
RegexDimFilter d = (RegexDimFilter) dimFilter;
return Sets.newHashSet(d.getDimension());
} else if (dimFilter instanceof SearchQueryDimFilter) {
SearchQueryDimFilter d = (SearchQueryDimFilter) dimFilter;
return Sets.newHashSet(d.getDimension());
} else if (dimFilter instanceof SelectorDimFilter) {
SelectorDimFilter d = (SelectorDimFilter) dimFilter;
return Sets.newHashSet(d.getDimension());
} else {
return null;
}
}
/**
* caculate the intervals which are covered by interval2, but not covered by interval1.
* result intervals = interval2 - interval1 interval2
* e.g.
* a list of interval2: ["2018-04-01T00:00:00.000Z/2018-04-02T00:00:00.000Z",
* "2018-04-03T00:00:00.000Z/2018-04-10T00:00:00.000Z"]
* a list of interval1: ["2018-04-04T00:00:00.000Z/2018-04-06T00:00:00.000Z"]
* the result list of intervals: ["2018-04-01T00:00:00.000Z/2018-04-02T00:00:00.000Z",
* "2018-04-03T00:00:00.000Z/2018-04-04T00:00:00.000Z",
* "2018-04-06T00:00:00.000Z/2018-04-10T00:00:00.000Z"]
* If interval2 is empty, then return an empty list of interval.
* @param interval2 list of intervals
* @param interval1 list of intervals
* @return list of intervals are covered by interval2, but not covered by interval1.
*/
public static List<Interval> minus(List<Interval> interval2, List<Interval> interval1)
{
if (interval1.isEmpty() || interval2.isEmpty()) {
return interval1;
}
Iterator<Interval> it1 = JodaUtils.condenseIntervals(interval1).iterator();
Iterator<Interval> it2 = JodaUtils.condenseIntervals(interval2).iterator();
List<Interval> remaining = Lists.newArrayList();
Interval currInterval1 = it1.next();
Interval currInterval2 = it2.next();
long start1 = currInterval1.getStartMillis();
long end1 = currInterval1.getEndMillis();
long start2 = currInterval2.getStartMillis();
long end2 = currInterval2.getEndMillis();
while (true) {
if (start2 < start1 && end2 <= start1) {
remaining.add(Intervals.utc(start2, end2));
if (it2.hasNext()) {
currInterval2 = it2.next();
start2 = currInterval2.getStartMillis();
end2 = currInterval2.getEndMillis();
} else {
break;
}
}
if (start2 < start1 && end2 > start1 && end2 < end1) {
remaining.add(Intervals.utc(start2, start1));
start1 = end2;
if (it2.hasNext()) {
currInterval2 = it2.next();
start2 = currInterval2.getStartMillis();
end2 = currInterval2.getEndMillis();
} else {
break;
}
}
if (start2 < start1 && end2 == end1) {
remaining.add(Intervals.utc(start2, start1));
if (it2.hasNext() && it1.hasNext()) {
currInterval2 = it2.next();
start2 = currInterval2.getStartMillis();
end2 = currInterval2.getEndMillis();
currInterval1 = it1.next();
start1 = currInterval1.getStartMillis();
end1 = currInterval1.getEndMillis();
} else {
break;
}
}
if (start2 < start1 && end2 > end1) {
remaining.add(Intervals.utc(start2, start1));
start2 = end1;
if (it1.hasNext()) {
currInterval1 = it1.next();
start1 = currInterval1.getStartMillis();
end1 = currInterval1.getEndMillis();
} else {
remaining.add(Intervals.utc(end1, end2));
break;
}
}
if (start2 == start1 && end2 >= start1 && end2 < end1) {
start1 = end2;
if (it2.hasNext()) {
currInterval2 = it2.next();
start2 = currInterval2.getStartMillis();
end2 = currInterval2.getEndMillis();
} else {
break;
}
}
if (start2 == start1 && end2 > end1) {
start2 = end1;
if (it1.hasNext()) {
currInterval1 = it1.next();
start1 = currInterval1.getStartMillis();
end1 = currInterval1.getEndMillis();
} else {
remaining.add(Intervals.utc(end1, end2));
break;
}
}
if (start2 > start1 && start2 < end1 && end2 < end1) {
start1 = end2;
if (it2.hasNext()) {
currInterval2 = it2.next();
start2 = currInterval2.getStartMillis();
end2 = currInterval2.getEndMillis();
} else {
break;
}
}
if (start2 > start1 && start2 < end1 && end2 > end1) {
start2 = end1;
if (it1.hasNext()) {
currInterval1 = it1.next();
start1 = currInterval1.getStartMillis();
end1 = currInterval1.getEndMillis();
} else {
remaining.add(Intervals.utc(end1, end2));
break;
}
}
if (start2 >= start1 && start2 <= end1 && end2 == end1) {
if (it2.hasNext() && it1.hasNext()) {
currInterval2 = it2.next();
start2 = currInterval2.getStartMillis();
end2 = currInterval2.getEndMillis();
currInterval1 = it1.next();
start1 = currInterval1.getStartMillis();
end1 = currInterval1.getEndMillis();
} else {
break;
}
}
if (start2 >= end1 && end2 > end1) {
if (it1.hasNext()) {
currInterval1 = it1.next();
start1 = currInterval1.getStartMillis();
end1 = currInterval1.getEndMillis();
} else {
remaining.add(Intervals.utc(start2, end2));
break;
}
}
}
while (it2.hasNext()) {
remaining.add(Intervals.of(it2.next().toString()));
}
return remaining;
}
}

View File

@ -0,0 +1 @@
io.druid.query.materializedview.MaterializedViewSelectionDruidModule

View File

@ -0,0 +1,330 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.materializedview;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.fasterxml.jackson.dataformat.smile.SmileFactory;
import com.fasterxml.jackson.dataformat.smile.SmileGenerator;
import com.google.common.base.Predicates;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import io.druid.client.BatchServerInventoryView;
import io.druid.client.BrokerSegmentWatcherConfig;
import io.druid.client.BrokerServerView;
import io.druid.client.DruidServer;
import io.druid.client.selector.HighestPriorityTierSelectorStrategy;
import io.druid.client.selector.RandomServerSelectorStrategy;
import io.druid.curator.CuratorTestBase;
import io.druid.indexing.materializedview.DerivativeDataSourceMetadata;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.java.util.common.Intervals;
import io.druid.java.util.http.client.HttpClient;
import io.druid.metadata.IndexerSQLMetadataStorageCoordinator;
import io.druid.metadata.TestDerbyConnector;
import io.druid.query.Query;
import static io.druid.query.QueryRunnerTestHelper.allGran;
import io.druid.query.QueryToolChestWarehouse;
import io.druid.query.QueryWatcher;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.LongSumAggregatorFactory;
import io.druid.query.spec.MultipleIntervalSegmentSpec;
import io.druid.query.topn.TopNQuery;
import io.druid.query.topn.TopNQueryBuilder;
import io.druid.segment.TestHelper;
import io.druid.server.coordination.DruidServerMetadata;
import io.druid.server.coordination.ServerType;
import io.druid.server.initialization.ZkPathsConfig;
import io.druid.server.metrics.NoopServiceEmitter;
import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.NoneShardSpec;
import org.easymock.EasyMock;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import java.io.IOException;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
public class DatasourceOptimizerTest extends CuratorTestBase
{
@Rule
public final TestDerbyConnector.DerbyConnectorRule derbyConnectorRule = new TestDerbyConnector.DerbyConnectorRule();
private TestDerbyConnector derbyConnector;
private DerivativeDataSourceManager derivativesManager;
private DruidServer druidServer;
private ObjectMapper jsonMapper;
private ZkPathsConfig zkPathsConfig;
private DataSourceOptimizer optimizer;
private MaterializedViewConfig viewConfig;
private IndexerSQLMetadataStorageCoordinator metadataStorageCoordinator;
private BatchServerInventoryView baseView;
private BrokerServerView brokerServerView;
@Before
public void setUp() throws Exception
{
derbyConnector = derbyConnectorRule.getConnector();
derbyConnector.createDataSourceTable();
derbyConnector.createSegmentTable();
viewConfig = new MaterializedViewConfig();
jsonMapper = TestHelper.makeJsonMapper();
jsonMapper.registerSubtypes(new NamedType(DerivativeDataSourceMetadata.class, "view"));
metadataStorageCoordinator = EasyMock.createMock(IndexerSQLMetadataStorageCoordinator.class);
derivativesManager = new DerivativeDataSourceManager(
viewConfig,
derbyConnectorRule.metadataTablesConfigSupplier(),
jsonMapper,
derbyConnector
);
metadataStorageCoordinator = new IndexerSQLMetadataStorageCoordinator(
jsonMapper,
derbyConnectorRule.metadataTablesConfigSupplier().get(),
derbyConnector
);
setupServerAndCurator();
curator.start();
curator.blockUntilConnected();
zkPathsConfig = new ZkPathsConfig();
setupViews();
druidServer = new DruidServer(
"localhost:1234",
"localhost:1234",
null,
10000000L,
ServerType.HISTORICAL,
"default_tier",
0
);
setupZNodeForServer(druidServer, new ZkPathsConfig(), jsonMapper);
optimizer = new DataSourceOptimizer(brokerServerView);
}
@After
public void tearDown() throws IOException
{
baseView.stop();
tearDownServerAndCurator();
}
@Test(timeout = 10 * 1000)
public void testOptimize() throws InterruptedException
{
// insert datasource metadata
String dataSource = "derivative";
String baseDataSource = "base";
Set<String> dims = Sets.newHashSet("dim1", "dim2", "dim3");
Set<String> metrics = Sets.newHashSet("cost");
DerivativeDataSourceMetadata metadata = new DerivativeDataSourceMetadata(baseDataSource, dims, metrics);
metadataStorageCoordinator.insertDataSourceMetadata(dataSource, metadata);
// insert base datasource segments
List<Boolean> baseResult = Lists.transform(
ImmutableList.<String>of(
"2011-04-01/2011-04-02",
"2011-04-02/2011-04-03",
"2011-04-03/2011-04-04",
"2011-04-04/2011-04-05",
"2011-04-05/2011-04-06"
),
interval -> {
final DataSegment segment = createDataSegment(
"base",
interval,
"v1",
Lists.newArrayList("dim1", "dim2", "dim3", "dim4"),
1024 * 1024
);
try {
metadataStorageCoordinator.announceHistoricalSegments(Sets.newHashSet(segment));
announceSegmentForServer(druidServer, segment, zkPathsConfig, jsonMapper);
}
catch (IOException e) {
return false;
}
return true;
}
);
// insert derivative segments
List<Boolean> derivativeResult = Lists.transform(
ImmutableList.<String>of(
"2011-04-01/2011-04-02",
"2011-04-02/2011-04-03",
"2011-04-03/2011-04-04"
),
interval -> {
final DataSegment segment = createDataSegment("derivative", interval, "v1", Lists.newArrayList("dim1", "dim2", "dim3"), 1024);
try {
metadataStorageCoordinator.announceHistoricalSegments(Sets.newHashSet(segment));
announceSegmentForServer(druidServer, segment, zkPathsConfig, jsonMapper);
}
catch (IOException e) {
return false;
}
return true;
}
);
Assert.assertFalse(baseResult.contains(false));
Assert.assertFalse(derivativeResult.contains(false));
derivativesManager.start();
while (DerivativeDataSourceManager.getAllDerivatives().isEmpty()) {
TimeUnit.SECONDS.sleep(1L);
}
// build user query
TopNQuery userQuery = new TopNQueryBuilder()
.dataSource("base")
.granularity(allGran)
.dimension("dim1")
.metric("cost")
.threshold(4)
.intervals("2011-04-01/2011-04-06")
.aggregators(
Lists.<AggregatorFactory>newArrayList(
new LongSumAggregatorFactory("cost", "cost")
)
)
.build();
List<Query> expectedQueryAfterOptimizing = Lists.newArrayList(
new TopNQueryBuilder()
.dataSource("derivative")
.granularity(allGran)
.dimension("dim1")
.metric("cost")
.threshold(4)
.intervals(new MultipleIntervalSegmentSpec(Lists.newArrayList(Intervals.of("2011-04-01/2011-04-04"))))
.aggregators(
Lists.<AggregatorFactory>newArrayList(
new LongSumAggregatorFactory("cost", "cost")
)
)
.build(),
new TopNQueryBuilder()
.dataSource("base")
.granularity(allGran)
.dimension("dim1")
.metric("cost")
.threshold(4)
.intervals(new MultipleIntervalSegmentSpec(Lists.newArrayList(Intervals.of("2011-04-04/2011-04-06"))))
.aggregators(
Lists.<AggregatorFactory>newArrayList(
new LongSumAggregatorFactory("cost", "cost")
)
)
.build()
);
Assert.assertEquals(expectedQueryAfterOptimizing, optimizer.optimize(userQuery));
derivativesManager.stop();
}
private DataSegment createDataSegment(String name, String intervalStr, String version, List<String> dims, long size)
{
return DataSegment.builder()
.dataSource(name)
.interval(Intervals.of(intervalStr))
.loadSpec(
ImmutableMap.<String, Object>of(
"type",
"local",
"path",
"somewhere"
)
)
.version(version)
.dimensions(dims)
.metrics(ImmutableList.<String>of("cost"))
.shardSpec(NoneShardSpec.instance())
.binaryVersion(9)
.size(size)
.build();
}
private void setupViews() throws Exception
{
baseView = new BatchServerInventoryView(
zkPathsConfig,
curator,
jsonMapper,
Predicates.alwaysTrue()
)
{
@Override
public void registerSegmentCallback(Executor exec, final SegmentCallback callback)
{
super.registerSegmentCallback(
exec, new SegmentCallback()
{
@Override
public CallbackAction segmentAdded(DruidServerMetadata server, DataSegment segment)
{
CallbackAction res = callback.segmentAdded(server, segment);
return res;
}
@Override
public CallbackAction segmentRemoved(DruidServerMetadata server, DataSegment segment)
{
CallbackAction res = callback.segmentRemoved(server, segment);
return res;
}
@Override
public CallbackAction segmentViewInitialized()
{
CallbackAction res = callback.segmentViewInitialized();
return res;
}
}
);
}
};
brokerServerView = new BrokerServerView(
EasyMock.createMock(QueryToolChestWarehouse.class),
EasyMock.createMock(QueryWatcher.class),
getSmileMapper(),
EasyMock.createMock(HttpClient.class),
baseView,
new HighestPriorityTierSelectorStrategy(new RandomServerSelectorStrategy()),
new NoopServiceEmitter(),
new BrokerSegmentWatcherConfig()
);
baseView.start();
}
private ObjectMapper getSmileMapper()
{
final SmileFactory smileFactory = new SmileFactory();
smileFactory.configure(SmileGenerator.Feature.ENCODE_BINARY_AS_7BIT, false);
smileFactory.delegateToTextual(true);
final ObjectMapper retVal = new DefaultObjectMapper(smileFactory);
retVal.getFactory().setCodec(retVal);
return retVal;
}
}

View File

@ -0,0 +1,101 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.materializedview;
import com.fasterxml.jackson.databind.InjectableValues;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import io.druid.math.expr.ExprMacroTable;
import io.druid.query.Query;
import static io.druid.query.QueryRunnerTestHelper.addRowsIndexConstant;
import static io.druid.query.QueryRunnerTestHelper.allGran;
import static io.druid.query.QueryRunnerTestHelper.commonDoubleAggregators;
import static io.druid.query.QueryRunnerTestHelper.dataSource;
import static io.druid.query.QueryRunnerTestHelper.fullOnInterval;
import static io.druid.query.QueryRunnerTestHelper.indexMetric;
import static io.druid.query.QueryRunnerTestHelper.marketDimension;
import io.druid.query.TableDataSource;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.DoubleMaxAggregatorFactory;
import io.druid.query.aggregation.DoubleMinAggregatorFactory;
import io.druid.query.aggregation.PostAggregator;
import io.druid.query.expression.LookupEnabledTestExprMacroTable;
import io.druid.query.topn.TopNQuery;
import io.druid.query.topn.TopNQueryBuilder;
import io.druid.segment.TestHelper;
import org.easymock.EasyMock;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.util.Arrays;
public class MaterializedViewQueryTest
{
private static final ObjectMapper jsonMapper = TestHelper.makeJsonMapper();
private DataSourceOptimizer optimizer;
@Before
public void setUp()
{
jsonMapper.registerSubtypes(new NamedType(MaterializedViewQuery.class, MaterializedViewQuery.TYPE));
optimizer = EasyMock.createMock(DataSourceOptimizer.class);
jsonMapper.setInjectableValues(
new InjectableValues.Std()
.addValue(ExprMacroTable.class.getName(), LookupEnabledTestExprMacroTable.INSTANCE)
.addValue(DataSourceOptimizer.class, optimizer)
);
}
@Test
public void testQuerySerialization() throws IOException
{
TopNQuery topNQuery = new TopNQueryBuilder()
.dataSource(dataSource)
.granularity(allGran)
.dimension(marketDimension)
.metric(indexMetric)
.threshold(4)
.intervals(fullOnInterval)
.aggregators(
Lists.<AggregatorFactory>newArrayList(
Iterables.concat(
commonDoubleAggregators,
Lists.newArrayList(
new DoubleMaxAggregatorFactory("maxIndex", "index"),
new DoubleMinAggregatorFactory("minIndex", "index")
)
)
)
)
.postAggregators(Arrays.<PostAggregator>asList(addRowsIndexConstant))
.build();
MaterializedViewQuery query = new MaterializedViewQuery(topNQuery, optimizer);
String json = jsonMapper.writeValueAsString(query);
Query serdeQuery = jsonMapper.readValue(json, Query.class);
Assert.assertEquals(query, serdeQuery);
Assert.assertEquals(new TableDataSource(dataSource), query.getDataSource());
Assert.assertEquals(allGran, query.getGranularity());
Assert.assertEquals(fullOnInterval.getIntervals(), query.getIntervals());
}
}

View File

@ -0,0 +1,231 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.materializedview;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import io.druid.java.util.common.Intervals;
import io.druid.query.groupby.GroupByQuery;
import io.druid.query.timeseries.TimeseriesQuery;
import io.druid.query.topn.TopNQuery;
import io.druid.segment.TestHelper;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Test;
import java.util.List;
import java.util.Set;
public class MaterializedViewUtilsTest
{
private static ObjectMapper jsonMapper = TestHelper.makeJsonMapper();
@Test
public void testGetRequiredFieldsFromGroupByQuery() throws Exception
{
String queryStr = "{\n" +
" \"queryType\": \"groupBy\",\n" +
" \"dataSource\": \"sample_datasource\",\n" +
" \"granularity\": \"day\",\n" +
" \"dimensions\": [\"country\", \"device\"],\n" +
" \"limitSpec\": { \"type\": \"default\", \"limit\": 5000, \"columns\": [\"country\", \"data_transfer\"] },\n" +
" \"filter\": {\n" +
" \"type\": \"and\",\n" +
" \"fields\": [\n" +
" { \"type\": \"selector\", \"dimension\": \"carrier\", \"value\": \"AT&T\" },\n" +
" { \"type\": \"or\", \n" +
" \"fields\": [\n" +
" { \"type\": \"selector\", \"dimension\": \"make\", \"value\": \"Apple\" },\n" +
" { \"type\": \"selector\", \"dimension\": \"make\", \"value\": \"Samsung\" }\n" +
" ]\n" +
" }\n" +
" ]\n" +
" },\n" +
" \"aggregations\": [\n" +
" { \"type\": \"longSum\", \"name\": \"total_usage\", \"fieldName\": \"user_count\" },\n" +
" { \"type\": \"doubleSum\", \"name\": \"data_transfer\", \"fieldName\": \"data_transfer\" }\n" +
" ],\n" +
" \"postAggregations\": [\n" +
" { \"type\": \"arithmetic\",\n" +
" \"name\": \"avg_usage\",\n" +
" \"fn\": \"/\",\n" +
" \"fields\": [\n" +
" { \"type\": \"fieldAccess\", \"fieldName\": \"data_transfer\" },\n" +
" { \"type\": \"fieldAccess\", \"fieldName\": \"total_usage\" }\n" +
" ]\n" +
" }\n" +
" ],\n" +
" \"intervals\": [ \"2012-01-01T00:00:00.000/2012-01-03T00:00:00.000\" ],\n" +
" \"having\": {\n" +
" \"type\": \"greaterThan\",\n" +
" \"aggregation\": \"total_usage\",\n" +
" \"value\": 100\n" +
" }\n" +
"}";
GroupByQuery query = jsonMapper.readValue(queryStr, GroupByQuery.class);
Set<String> fields = MaterializedViewUtils.getRequiredFields(query);
Assert.assertEquals(
Sets.newHashSet("country", "device", "carrier", "make", "user_count", "data_transfer"),
fields
);
}
@Test
public void testGetRequiredFieldsFromTopNQuery() throws Exception
{
String queryStr = "{\n" +
" \"queryType\": \"topN\",\n" +
" \"dataSource\": \"sample_data\",\n" +
" \"dimension\": \"sample_dim\",\n" +
" \"threshold\": 5,\n" +
" \"metric\": \"count\",\n" +
" \"granularity\": \"all\",\n" +
" \"filter\": {\n" +
" \"type\": \"and\",\n" +
" \"fields\": [\n" +
" {\n" +
" \"type\": \"selector\",\n" +
" \"dimension\": \"dim1\",\n" +
" \"value\": \"some_value\"\n" +
" },\n" +
" {\n" +
" \"type\": \"selector\",\n" +
" \"dimension\": \"dim2\",\n" +
" \"value\": \"some_other_val\"\n" +
" }\n" +
" ]\n" +
" },\n" +
" \"aggregations\": [\n" +
" {\n" +
" \"type\": \"longSum\",\n" +
" \"name\": \"count\",\n" +
" \"fieldName\": \"count\"\n" +
" },\n" +
" {\n" +
" \"type\": \"doubleSum\",\n" +
" \"name\": \"some_metric\",\n" +
" \"fieldName\": \"some_metric\"\n" +
" }\n" +
" ],\n" +
" \"postAggregations\": [\n" +
" {\n" +
" \"type\": \"arithmetic\",\n" +
" \"name\": \"average\",\n" +
" \"fn\": \"/\",\n" +
" \"fields\": [\n" +
" {\n" +
" \"type\": \"fieldAccess\",\n" +
" \"name\": \"some_metric\",\n" +
" \"fieldName\": \"some_metric\"\n" +
" },\n" +
" {\n" +
" \"type\": \"fieldAccess\",\n" +
" \"name\": \"count\",\n" +
" \"fieldName\": \"count\"\n" +
" }\n" +
" ]\n" +
" }\n" +
" ],\n" +
" \"intervals\": [\n" +
" \"2013-08-31T00:00:00.000/2013-09-03T00:00:00.000\"\n" +
" ]\n" +
"}";
TopNQuery query = jsonMapper.readValue(queryStr, TopNQuery.class);
Set<String> fields = MaterializedViewUtils.getRequiredFields(query);
Assert.assertEquals(
Sets.newHashSet("sample_dim", "dim1", "dim2", "count", "some_metric"),
fields
);
}
@Test
public void testGetRequiredFieldsFromTimeseriesQuery() throws Exception
{
String queryStr = "{\n" +
" \"queryType\": \"timeseries\",\n" +
" \"dataSource\": \"sample_datasource\",\n" +
" \"granularity\": \"day\",\n" +
" \"descending\": \"true\",\n" +
" \"filter\": {\n" +
" \"type\": \"and\",\n" +
" \"fields\": [\n" +
" { \"type\": \"selector\", \"dimension\": \"sample_dimension1\", \"value\": \"sample_value1\" },\n" +
" { \"type\": \"or\",\n" +
" \"fields\": [\n" +
" { \"type\": \"selector\", \"dimension\": \"sample_dimension2\", \"value\": \"sample_value2\" },\n" +
" { \"type\": \"selector\", \"dimension\": \"sample_dimension3\", \"value\": \"sample_value3\" }\n" +
" ]\n" +
" }\n" +
" ]\n" +
" },\n" +
" \"aggregations\": [\n" +
" { \"type\": \"longSum\", \"name\": \"sample_name1\", \"fieldName\": \"sample_fieldName1\" },\n" +
" { \"type\": \"doubleSum\", \"name\": \"sample_name2\", \"fieldName\": \"sample_fieldName2\" }\n" +
" ],\n" +
" \"postAggregations\": [\n" +
" { \"type\": \"arithmetic\",\n" +
" \"name\": \"sample_divide\",\n" +
" \"fn\": \"/\",\n" +
" \"fields\": [\n" +
" { \"type\": \"fieldAccess\", \"name\": \"postAgg__sample_name1\", \"fieldName\": \"sample_name1\" },\n" +
" { \"type\": \"fieldAccess\", \"name\": \"postAgg__sample_name2\", \"fieldName\": \"sample_name2\" }\n" +
" ]\n" +
" }\n" +
" ],\n" +
" \"intervals\": [ \"2012-01-01T00:00:00.000/2012-01-03T00:00:00.000\" ]\n" +
"}";
TimeseriesQuery query = jsonMapper.readValue(queryStr, TimeseriesQuery.class);
Set<String> fields = MaterializedViewUtils.getRequiredFields(query);
Assert.assertEquals(
Sets.newHashSet("sample_dimension1", "sample_dimension2", "sample_dimension3", "sample_fieldName1",
"sample_fieldName2"),
fields
);
}
@Test
public void testIntervalMinus()
{
List<Interval> intervalList1 = Lists.newArrayList(
Intervals.of("2012-01-02T00:00:00.000/2012-01-03T00:00:00.000"),
Intervals.of("2012-01-08T00:00:00.000/2012-01-10T00:00:00.000"),
Intervals.of("2012-01-16T00:00:00.000/2012-01-17T00:00:00.000")
);
List<Interval> intervalList2 = Lists.newArrayList(
Intervals.of("2012-01-01T00:00:00.000/2012-01-04T00:00:00.000"),
Intervals.of("2012-01-05T00:00:00.000/2012-01-10T00:00:00.000"),
Intervals.of("2012-01-16T00:00:00.000/2012-01-18T00:00:00.000"),
Intervals.of("2012-01-19T00:00:00.000/2012-01-20T00:00:00.000")
);
List<Interval> result = MaterializedViewUtils.minus(intervalList2, intervalList1);
Assert.assertEquals(
Lists.newArrayList(
Intervals.of("2012-01-01T00:00:00.000/2012-01-02T00:00:00.000"),
Intervals.of("2012-01-03T00:00:00.000/2012-01-04T00:00:00.000"),
Intervals.of("2012-01-05T00:00:00.000/2012-01-08T00:00:00.000"),
Intervals.of("2012-01-17T00:00:00.000/2012-01-18T00:00:00.000"),
Intervals.of("2012-01-19T00:00:00.000/2012-01-20T00:00:00.000")
),
result
);
}
}

View File

@ -26,6 +26,7 @@ import com.google.common.collect.Sets;
import io.druid.indexing.overlord.DataSourceMetadata;
import io.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
import io.druid.indexing.overlord.SegmentPublishResult;
import io.druid.java.util.common.Pair;
import io.druid.segment.realtime.appenderator.SegmentIdentifier;
import io.druid.timeline.DataSegment;
import org.joda.time.Interval;
@ -64,12 +65,24 @@ public class TestIndexerMetadataStorageCoordinator implements IndexerMetadataSto
return false;
}
@Override
public boolean insertDataSourceMetadata(String dataSource, DataSourceMetadata dataSourceMetadata)
{
return false;
}
@Override
public List<DataSegment> getUsedSegmentsForInterval(String dataSource, Interval interval)
{
return ImmutableList.of();
}
@Override
public List<Pair<DataSegment, String>> getUsedSegmentAndCreatedDateForInterval(String dataSource, Interval interval)
{
return ImmutableList.of();
}
@Override
public List<DataSegment> getUsedSegmentsForIntervals(
String dataSource, List<Interval> intervals

View File

@ -147,6 +147,8 @@
<module>extensions-contrib/kafka-emitter</module>
<module>extensions-contrib/redis-cache</module>
<module>extensions-contrib/opentsdb-emitter</module>
<module>extensions-contrib/materialized-view-maintenance</module>
<module>extensions-contrib/materialized-view-selection</module>
<!-- distribution packaging -->
<module>distribution</module>
</modules>

View File

@ -19,6 +19,7 @@
package io.druid.indexing.overlord;
import io.druid.java.util.common.Pair;
import io.druid.segment.realtime.appenderator.SegmentIdentifier;
import io.druid.timeline.DataSegment;
import org.joda.time.Interval;
@ -43,6 +44,15 @@ public interface IndexerMetadataStorageCoordinator
*/
List<DataSegment> getUsedSegmentsForInterval(String dataSource, Interval interval);
/**
* Get all used segments and the created_date of these segments in a given datasource and interval
*
* @param dataSource The datasource to query
* @param interval The interval for which all applicable and used datasources are requested. Start is inclusive, end is exclusive
* @return The DataSegments and the related created_date of segments which include data in the requested interval
*/
List<Pair<DataSegment, String>> getUsedSegmentAndCreatedDateForInterval(String dataSource, Interval interval);
/**
* Get all segments which may include any data in the interval and are flagged as used.
*
@ -154,6 +164,16 @@ public interface IndexerMetadataStorageCoordinator
*/
boolean resetDataSourceMetadata(String dataSource, DataSourceMetadata dataSourceMetadata) throws IOException;
/**
* Insert dataSourceMetadata entry for 'dataSource'.
*
* @param dataSource identifier
* @param dataSourceMetadata value to set
*
* @return true if the entry was inserted, false otherwise
*/
boolean insertDataSourceMetadata(String dataSource, DataSourceMetadata dataSourceMetadata);
void updateSegmentMetadata(Set<DataSegment> segments);
void deleteSegments(Set<DataSegment> segments);

View File

@ -39,6 +39,7 @@ import io.druid.java.util.common.DateTimes;
import io.druid.java.util.common.IAE;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.Intervals;
import io.druid.java.util.common.Pair;
import io.druid.java.util.common.StringUtils;
import io.druid.java.util.common.lifecycle.LifecycleStart;
import io.druid.java.util.common.logger.Logger;
@ -61,10 +62,13 @@ import org.skife.jdbi.v2.TransactionCallback;
import org.skife.jdbi.v2.TransactionStatus;
import org.skife.jdbi.v2.exceptions.CallbackFailedException;
import org.skife.jdbi.v2.tweak.HandleCallback;
import org.skife.jdbi.v2.tweak.ResultSetMapper;
import org.skife.jdbi.v2.util.ByteArrayMapper;
import org.skife.jdbi.v2.util.StringMapper;
import java.io.IOException;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@ -1007,4 +1011,58 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
log.info("Found %,d segments for %s for interval %s.", matchingSegments.size(), dataSource, interval);
return matchingSegments;
}
@Override
public List<Pair<DataSegment, String>> getUsedSegmentAndCreatedDateForInterval(String dataSource, Interval interval)
{
return connector.retryWithHandle(
handle -> handle.createQuery(
StringUtils.format(
"SELECT created_date, payload FROM %1$s WHERE dataSource = :dataSource " +
"AND start >= :start AND %2$send%2$s <= :end AND used = true",
dbTables.getSegmentsTable(), connector.getQuoteString()
)
)
.bind("dataSource", dataSource)
.bind("start", interval.getStart().toString())
.bind("end", interval.getEnd().toString())
.map(new ResultSetMapper<Pair<DataSegment, String>>()
{
@Override
public Pair<DataSegment, String> map(int index, ResultSet r, StatementContext ctx) throws SQLException
{
try {
return new Pair<>(
jsonMapper.readValue(r.getBytes("payload"), DataSegment.class),
r.getString("created_date"));
}
catch (IOException e) {
throw new RuntimeException(e);
}
}
})
.list()
);
}
@Override
public boolean insertDataSourceMetadata(String dataSource, DataSourceMetadata metadata)
{
return 1 == connector.getDBI().inTransaction(
(handle, status) -> handle
.createStatement(
StringUtils.format(
"INSERT INTO %s (dataSource, created_date, commit_metadata_payload, commit_metadata_sha1) VALUES" +
" (:dataSource, :created_date, :commit_metadata_payload, :commit_metadata_sha1)",
dbTables.getDataSourceTable()
)
)
.bind("dataSource", dataSource)
.bind("created_date", DateTimes.nowUtc().toString())
.bind("commit_metadata_payload", jsonMapper.writeValueAsBytes(metadata))
.bind("commit_metadata_sha1", BaseEncoding.base16().encode(
Hashing.sha1().hashBytes(jsonMapper.writeValueAsBytes(metadata)).asBytes()))
.execute()
);
}
}