DruidInputSource: Fix issues in column projection, timestamp handling. (#10267)

* DruidInputSource: Fix issues in column projection, timestamp handling.

DruidInputSource, DruidSegmentReader changes:

1) Remove "dimensions" and "metrics". They are not necessary, because we
   can compute which columns we need to read based on what is going to
   be used by the timestamp, transform, dimensions, and metrics.
2) Start using ColumnsFilter (see below) to decide which columns we need
   to read.
3) Actually respect the "timestampSpec". Previously, it was ignored, and
   the timestamp of the returned InputRows was set to the `__time` column
   of the input datasource.

(1) and (2) together fix a bug in which the DruidInputSource would not
properly read columns that are used as inputs to a transformSpec.

(3) fixes a bug where the timestampSpec would be ignored if you attempted
to set the column to something other than `__time`.

(1) and (3) are breaking changes.

Web console changes:

1) Remove "Dimensions" and "Metrics" from the Druid input source.
2) Set timestampSpec to `{"column": "__time", "format": "millis"}` for
   compatibility with the new behavior.

Other changes:

1) Add ColumnsFilter, a new class that allows input readers to determine
   which columns they need to read. Currently, it's only used by the
   DruidInputSource, but it could be used by other columnar input sources
   in the future.
2) Add a ColumnsFilter to InputRowSchema.
3) Remove the metric names from InputRowSchema (they were unused).
4) Add InputRowSchemas.fromDataSchema method that computes the proper
   ColumnsFilter for given timestamp, dimensions, transform, and metrics.
5) Add "getRequiredColumns" method to TransformSpec to support the above.

* Various fixups.

* Uncomment incorrectly commented lines.

* Move TransformSpecTest to the proper module.

* Add druid.indexer.task.ignoreTimestampSpecForDruidInputSource setting.

* Fix.

* Fix build.

* Checkstyle.

* Misc fixes.

* Fix test.

* Move config.

* Fix imports.

* Fixup.

* Fix ShuffleResourceTest.

* Add import.

* Smarter exclusions.

* Fixes based on tests.

Also, add TIME_COLUMN constant in the web console.

* Adjustments for tests.

* Reorder test data.

* Update docs.

* Update docs to say Druid 0.22.0 instead of 0.21.0.

* Fix test.

* Fix ITAutoCompactionTest.

* Changes from review & from merging.
This commit is contained in:
Gian Merlino 2021-03-25 10:32:21 -07:00 committed by GitHub
parent efc5d7d112
commit bf20f9e979
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
78 changed files with 2005 additions and 414 deletions

View File

@ -0,0 +1,183 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.data.input;
import java.util.Collections;
import java.util.HashSet;
import java.util.Objects;
import java.util.Set;
/**
* Used by some {@link InputSourceReader} implementations in order to know what columns will need to be read out
* of the {@link InputRow} objects they create.
*
* This is meant to be useful as an optimization: if we're reading from a columnar data format, then when a column
* isn't going to be needed, we shouldn't read it.
*
* @see InputSource#reader accepts objects of this class
*/
public abstract class ColumnsFilter
{
/**
* Accepts all columns.
*/
public static ColumnsFilter all()
{
return new ExclusionBased(Collections.emptySet());
}
/**
* Accepts a specific list of columns.
*/
public static ColumnsFilter inclusionBased(final Set<String> inclusions)
{
return new InclusionBased(inclusions);
}
/**
* Accepts all columns, except those on a specific list.
*/
public static ColumnsFilter exclusionBased(final Set<String> exclusions)
{
return new ExclusionBased(exclusions);
}
/**
* Check if a column should be included or not.
*/
public abstract boolean apply(String column);
/**
* Returns a new filter with a particular column added. The returned filter will return true from {@link #apply}
* on this column.
*/
public abstract ColumnsFilter plus(String column);
public static class InclusionBased extends ColumnsFilter
{
private final Set<String> inclusions;
private InclusionBased(Set<String> inclusions)
{
this.inclusions = inclusions;
}
@Override
public boolean apply(String column)
{
return inclusions.contains(column);
}
@Override
public ColumnsFilter plus(String column)
{
if (inclusions.contains(column)) {
return this;
} else {
final Set<String> copy = new HashSet<>(inclusions);
copy.add(column);
return new InclusionBased(copy);
}
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
InclusionBased that = (InclusionBased) o;
return Objects.equals(inclusions, that.inclusions);
}
@Override
public int hashCode()
{
return Objects.hash(inclusions);
}
@Override
public String toString()
{
return "ColumnsFilter.InclusionBased{" +
"inclusions=" + inclusions +
'}';
}
}
public static class ExclusionBased extends ColumnsFilter
{
private final Set<String> exclusions;
public ExclusionBased(Set<String> exclusions)
{
this.exclusions = exclusions;
}
@Override
public boolean apply(String column)
{
return !exclusions.contains(column);
}
@Override
public ColumnsFilter plus(String column)
{
if (!exclusions.contains(column)) {
return this;
} else {
final Set<String> copy = new HashSet<>(exclusions);
copy.remove(column);
return new ExclusionBased(copy);
}
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
ExclusionBased that = (ExclusionBased) o;
return Objects.equals(exclusions, that.exclusions);
}
@Override
public int hashCode()
{
return Objects.hash(exclusions);
}
@Override
public String toString()
{
return "ColumnsFilter.ExclusionBased{" +
"exclusions=" + exclusions +
'}';
}
}
}

View File

@ -22,8 +22,6 @@ package org.apache.druid.data.input;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.TimestampSpec;
import java.util.List;
/**
* Schema of {@link InputRow}.
*/
@ -31,13 +29,17 @@ public class InputRowSchema
{
private final TimestampSpec timestampSpec;
private final DimensionsSpec dimensionsSpec;
private final List<String> metricNames;
private final ColumnsFilter columnsFilter;
public InputRowSchema(TimestampSpec timestampSpec, DimensionsSpec dimensionsSpec, List<String> metricNames)
public InputRowSchema(
final TimestampSpec timestampSpec,
final DimensionsSpec dimensionsSpec,
final ColumnsFilter columnsFilter
)
{
this.timestampSpec = timestampSpec;
this.dimensionsSpec = dimensionsSpec;
this.metricNames = metricNames;
this.columnsFilter = columnsFilter;
}
public TimestampSpec getTimestampSpec()
@ -50,8 +52,17 @@ public class InputRowSchema
return dimensionsSpec;
}
public List<String> getMetricNames()
/**
* A {@link ColumnsFilter} that can filter down the list of columns that must be read after flattening.
*
* Logically, Druid applies ingestion spec components in a particular order: first flattenSpec (if any), then
* timestampSpec, then transformSpec, and finally dimensionsSpec and metricsSpec.
*
* If a flattenSpec is provided, this method returns a filter that should be applied after flattening. So, it will
* be based on what needs to pass between the flattenSpec and everything beyond it.
*/
public ColumnsFilter getColumnsFilter()
{
return metricNames;
return columnsFilter;
}
}

View File

@ -78,5 +78,9 @@ public interface InputSource
* @param inputFormat to parse data. It can be null if {@link #needsFormat()} = true
* @param temporaryDirectory to store temp data. It will be cleaned up automatically once the task is finished.
*/
InputSourceReader reader(InputRowSchema inputRowSchema, @Nullable InputFormat inputFormat, File temporaryDirectory);
InputSourceReader reader(
InputRowSchema inputRowSchema,
@Nullable InputFormat inputFormat,
File temporaryDirectory
);
}

View File

@ -25,8 +25,10 @@ import org.joda.time.DateTime;
import java.util.List;
import java.util.Map;
import java.util.Objects;
/**
*
*/
@PublicApi
public class MapBasedInputRow extends MapBasedRow implements InputRow
@ -59,6 +61,28 @@ public class MapBasedInputRow extends MapBasedRow implements InputRow
return dimensions;
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
if (!super.equals(o)) {
return false;
}
MapBasedInputRow that = (MapBasedInputRow) o;
return Objects.equals(dimensions, that.dimensions);
}
@Override
public int hashCode()
{
return Objects.hash(super.hashCode(), dimensions);
}
@Override
public String toString()
{

View File

@ -37,7 +37,6 @@ import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.stream.Stream;
@ -71,7 +70,7 @@ public class FirehoseFactoryToInputSourceAdaptorTest extends InitializedNullHand
new InputRowSchema(
inputRowParser.getParseSpec().getTimestampSpec(),
inputRowParser.getParseSpec().getDimensionsSpec(),
Collections.emptyList()
ColumnsFilter.all()
),
null,
null

View File

@ -0,0 +1,92 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.data.input.impl;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import nl.jqno.equalsverifier.EqualsVerifier;
import org.apache.druid.data.input.ColumnsFilter;
import org.junit.Assert;
import org.junit.Test;
import java.util.List;
import java.util.stream.Collectors;
public class ColumnsFilterTest
{
private static final List<String> COLUMNS = ImmutableList.of("a", "b", "c");
@Test
public void testAll()
{
Assert.assertEquals(
ImmutableList.of("a", "b", "c"),
apply(ColumnsFilter.all(), COLUMNS)
);
}
@Test
public void testInclusionBased()
{
Assert.assertEquals(
ImmutableList.of("b"),
apply(ColumnsFilter.inclusionBased(ImmutableSet.of("b")), COLUMNS)
);
}
@Test
public void testInclusionBasedPlus()
{
Assert.assertEquals(
ColumnsFilter.inclusionBased(ImmutableSet.of("a", "b", "c")),
ColumnsFilter.inclusionBased(ImmutableSet.of("b", "c")).plus("a").plus("c")
);
}
@Test
public void testExclusionBased()
{
Assert.assertEquals(
ImmutableList.of("a", "c"),
apply(ColumnsFilter.exclusionBased(ImmutableSet.of("b")), COLUMNS)
);
}
@Test
public void testExclusionBasedPlus()
{
Assert.assertEquals(
ColumnsFilter.exclusionBased(ImmutableSet.of("b")),
ColumnsFilter.exclusionBased(ImmutableSet.of("b", "c")).plus("a").plus("c")
);
}
@Test
public void testEquals()
{
EqualsVerifier.forClass(ColumnsFilter.InclusionBased.class).usingGetClass().verify();
EqualsVerifier.forClass(ColumnsFilter.ExclusionBased.class).usingGetClass().verify();
}
private List<String> apply(ColumnsFilter columnsFilter, List<String> columns)
{
return columns.stream().filter(columnsFilter::apply).collect(Collectors.toList());
}
}

View File

@ -23,6 +23,7 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.data.input.ColumnsFilter;
import org.apache.druid.data.input.InputEntityReader;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.InputRowSchema;
@ -37,7 +38,6 @@ import org.junit.Test;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.stream.Collectors;
@ -47,7 +47,7 @@ public class CsvReaderTest
private static final InputRowSchema INPUT_ROW_SCHEMA = new InputRowSchema(
new TimestampSpec("ts", "auto", null),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(Arrays.asList("ts", "name"))),
Collections.emptyList()
ColumnsFilter.all()
);
@BeforeClass
@ -229,7 +229,7 @@ public class CsvReaderTest
new InputRowSchema(
new TimestampSpec("Timestamp", "auto", null),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("Timestamp"))),
Collections.emptyList()
ColumnsFilter.all()
),
source,
null

View File

@ -22,6 +22,7 @@ package org.apache.druid.data.input.impl;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.data.input.ColumnsFilter;
import org.apache.druid.data.input.InputEntityReader;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.InputRowSchema;
@ -35,7 +36,6 @@ import org.junit.Test;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
@ -44,7 +44,7 @@ public class DelimitedReaderTest
private static final InputRowSchema INPUT_ROW_SCHEMA = new InputRowSchema(
new TimestampSpec("ts", "auto", null),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(Arrays.asList("ts", "name"))),
Collections.emptyList()
ColumnsFilter.all()
);
@BeforeClass

View File

@ -21,6 +21,7 @@ package org.apache.druid.data.input.impl;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import org.apache.druid.data.input.ColumnsFilter;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.InputRowSchema;
import org.apache.druid.java.util.common.DateTimes;
@ -37,7 +38,6 @@ import java.io.Writer;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
public class InputEntityIteratingReaderTest
@ -64,7 +64,7 @@ public class InputEntityIteratingReaderTest
new DimensionsSpec(
DimensionsSpec.getDefaultSchemas(ImmutableList.of("time", "name", "score"))
),
Collections.emptyList()
ColumnsFilter.all()
),
new CsvInputFormat(
ImmutableList.of("time", "name", "score"),

View File

@ -21,6 +21,7 @@ package org.apache.druid.data.input.impl;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import org.apache.druid.data.input.ColumnsFilter;
import org.apache.druid.data.input.InputEntityReader;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.InputRowSchema;
@ -66,7 +67,7 @@ public class JsonLineReaderTest
new InputRowSchema(
new TimestampSpec("timestamp", "iso", null),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("bar", "foo"))),
Collections.emptyList()
ColumnsFilter.all()
),
source,
null
@ -116,7 +117,7 @@ public class JsonLineReaderTest
new InputRowSchema(
new TimestampSpec("timestamp", "iso", null),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("foo"))),
Collections.emptyList()
ColumnsFilter.all()
),
source,
null
@ -158,7 +159,7 @@ public class JsonLineReaderTest
new InputRowSchema(
new TimestampSpec("timestamp", "iso", null),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(Collections.emptyList())),
Collections.emptyList()
ColumnsFilter.all()
),
source,
null
@ -200,7 +201,7 @@ public class JsonLineReaderTest
new InputRowSchema(
new TimestampSpec("timestamp", "iso", null),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(Collections.emptyList())),
Collections.emptyList()
ColumnsFilter.all()
),
source,
null
@ -242,7 +243,7 @@ public class JsonLineReaderTest
new InputRowSchema(
new TimestampSpec("timestamp", "iso", null),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(Collections.emptyList())),
Collections.emptyList()
ColumnsFilter.all()
),
source,
null

View File

@ -21,6 +21,7 @@ package org.apache.druid.data.input.impl;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import org.apache.druid.data.input.ColumnsFilter;
import org.apache.druid.data.input.InputEntityReader;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.InputRowListPlusRawValues;
@ -38,7 +39,6 @@ import org.junit.Test;
import org.junit.rules.ExpectedException;
import java.io.IOException;
import java.util.Collections;
public class JsonReaderTest
{
@ -75,7 +75,7 @@ public class JsonReaderTest
new InputRowSchema(
new TimestampSpec("timestamp", "iso", null),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("bar", "foo"))),
Collections.emptyList()
ColumnsFilter.all()
),
source,
null
@ -141,7 +141,7 @@ public class JsonReaderTest
new InputRowSchema(
new TimestampSpec("timestamp", "iso", null),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("bar", "foo"))),
Collections.emptyList()
ColumnsFilter.all()
),
source,
null
@ -194,7 +194,8 @@ public class JsonReaderTest
final ByteEntity source = new ByteEntity(
StringUtils.toUtf8("{\"timestamp\":\"2019-01-01\",\"bar\":null,\"foo\":\"x\",\"baz\":4,\"o\":{\"mg\":1}}"
+ "{\"timestamp\":\"2019-01-01\",\"bar\":null,\"foo\":\"x\",\"baz\":4xxx,\"o\":{\"mg\":2}}" //baz property is illegal
+ "{\"timestamp\":\"2019-01-01\",\"bar\":null,\"foo\":\"x\",\"baz\":4xxx,\"o\":{\"mg\":2}}"
//baz property is illegal
+ "{\"timestamp\":\"2019-01-01\",\"bar\":null,\"foo\":\"x\",\"baz\":4,\"o\":{\"mg\":3}}")
);
@ -202,7 +203,7 @@ public class JsonReaderTest
new InputRowSchema(
new TimestampSpec("timestamp", "iso", null),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("bar", "foo"))),
Collections.emptyList()
ColumnsFilter.all()
),
source,
null
@ -255,7 +256,7 @@ public class JsonReaderTest
new InputRowSchema(
new TimestampSpec("timestamp", "iso", null),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("bar", "foo"))),
Collections.emptyList()
ColumnsFilter.all()
),
source,
null
@ -314,7 +315,8 @@ public class JsonReaderTest
//2nd row is ill-formed
final ByteEntity source = new ByteEntity(
StringUtils.toUtf8("{\"timestamp\":\"2019-01-01\",\"bar\":null,\"foo\":\"x\",\"baz\":4,\"o\":{\"mg\":1}}"
+ "{\"timestamp\":\"2019-01-01\",\"bar\":null,\"foo\":\"x\",\"baz\":4xxx,\"o\":{\"mg\":2}}\n" //value of baz is invalid
+ "{\"timestamp\":\"2019-01-01\",\"bar\":null,\"foo\":\"x\",\"baz\":4xxx,\"o\":{\"mg\":2}}\n"
//value of baz is invalid
+ "{\"timestamp\":\"2019-01-01\",\"bar\":null,\"foo\":\"x\",\"baz\":4,\"o\":{\"mg\":3}}\n")
);
@ -322,7 +324,7 @@ public class JsonReaderTest
new InputRowSchema(
new TimestampSpec("timestamp", "iso", null),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("bar", "foo"))),
Collections.emptyList()
ColumnsFilter.all()
),
source,
null
@ -377,7 +379,7 @@ public class JsonReaderTest
new InputRowSchema(
new TimestampSpec("timestamp", "iso", null),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("bar", "foo"))),
Collections.emptyList()
ColumnsFilter.all()
),
source,
null
@ -431,7 +433,7 @@ public class JsonReaderTest
new InputRowSchema(
new TimestampSpec("timestamp", "iso", null),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("bar", "foo"))),
Collections.emptyList()
ColumnsFilter.all()
),
source,
null

View File

@ -1286,6 +1286,7 @@ Additional peon configs include:
|`druid.indexer.task.gracefulShutdownTimeout`|Wait this long on middleManager restart for restorable tasks to gracefully exit.|PT5M|
|`druid.indexer.task.hadoopWorkingPath`|Temporary working directory for Hadoop tasks.|`/tmp/druid-indexing`|
|`druid.indexer.task.restoreTasksOnRestart`|If true, MiddleManagers will attempt to stop tasks gracefully on shutdown and restore them on restart.|false|
|`druid.indexer.task.ignoreTimestampSpecForDruidInputSource`|If true, tasks using the [Druid input source](../ingestion/native-batch.md#druid-input-source) will ignore the provided timestampSpec, and will use the `__time` column of the input datasource. This option is provided for compatibility with ingestion specs written before Druid 0.22.0.|false|
|`druid.indexer.server.maxChatRequests`|Maximum number of concurrent requests served by a task's chat handler. Set to 0 to disable limiting.|0|
If the peon is running in remote mode, there must be an Overlord up and running. Peons in remote mode can set the following configurations:
@ -1350,6 +1351,7 @@ then the value from the configuration below is used:
|`druid.indexer.task.gracefulShutdownTimeout`|Wait this long on Indexer restart for restorable tasks to gracefully exit.|PT5M|
|`druid.indexer.task.hadoopWorkingPath`|Temporary working directory for Hadoop tasks.|`/tmp/druid-indexing`|
|`druid.indexer.task.restoreTasksOnRestart`|If true, the Indexer will attempt to stop tasks gracefully on shutdown and restore them on restart.|false|
|`druid.indexer.task.ignoreTimestampSpecForDruidInputSource`|If true, tasks using the [Druid input source](../ingestion/native-batch.md#druid-input-source) will ignore the provided timestampSpec, and will use the `__time` column of the input datasource. This option is provided for compatibility with ingestion specs written before Druid 0.22.0.|false|
|`druid.peon.taskActionClient.retry.minWait`|The minimum retry time to communicate with Overlord.|PT5S|
|`druid.peon.taskActionClient.retry.maxWait`|The maximum retry time to communicate with Overlord.|PT1M|
|`druid.peon.taskActionClient.retry.maxRetryCount`|The maximum number of retries to communicate with Overlord.|60|

View File

@ -1292,60 +1292,82 @@ no `inputFormat` field needs to be specified in the ingestion spec when using th
|type|This should be "druid".|yes|
|dataSource|A String defining the Druid datasource to fetch rows from|yes|
|interval|A String representing an ISO-8601 interval, which defines the time range to fetch the data over.|yes|
|dimensions|A list of Strings containing the names of dimension columns to select from the Druid datasource. If the list is empty, no dimensions are returned. If null, all dimensions are returned. |no|
|metrics|The list of Strings containing the names of metric columns to select. If the list is empty, no metrics are returned. If null, all metrics are returned.|no|
|filter| See [Filters](../querying/filters.md). Only rows that match the filter, if specified, will be returned.|no|
A minimal example DruidInputSource spec is shown below:
The Druid input source can be used for a variety of purposes, including:
- Creating new datasources that are rolled-up copies of existing datasources.
- Changing the [partitioning or sorting](index.md#partitioning) of a datasource to improve performance.
- Updating or removing rows using a [`transformSpec`](index.md#transformspec).
When using the Druid input source, the timestamp column shows up as a numeric field named `__time` set to the number
of milliseconds since the epoch (January 1, 1970 00:00:00 UTC). It is common to use this in the timestampSpec, if you
want the output timestamp to be equivalent to the input timestamp. In this case, set the timestamp column to `__time`
and the format to `auto` or `millis`.
It is OK for the input and output datasources to be the same. In this case, newly generated data will overwrite the
previous data for the intervals specified in the `granularitySpec`. Generally, if you are going to do this, it is a
good idea to test out your reindexing by writing to a separate datasource before overwriting your main one.
Alternatively, if your goals can be satisfied by [compaction](compaction.md), consider that instead as a simpler
approach.
An example task spec is shown below. It reads from a hypothetical raw datasource `wikipedia_raw` and creates a new
rolled-up datasource `wikipedia_rollup` by grouping on hour, "countryName", and "page".
```json
...
"ioConfig": {
"type": "index_parallel",
"inputSource": {
"type": "druid",
"dataSource": "wikipedia",
"interval": "2013-01-01/2013-01-02"
}
...
},
...
```
The spec above will read all existing dimension and metric columns from
the `wikipedia` datasource, including all rows with a timestamp (the `__time` column)
within the interval `2013-01-01/2013-01-02`.
A spec that applies a filter and reads a subset of the original datasource's columns is shown below.
```json
...
"ioConfig": {
"type": "index_parallel",
"inputSource": {
"type": "druid",
"dataSource": "wikipedia",
"interval": "2013-01-01/2013-01-02",
{
"type": "index_parallel",
"spec": {
"dataSchema": {
"dataSource": "wikipedia_rollup",
"timestampSpec": {
"column": "__time",
"format": "millis"
},
"dimensionsSpec": {
"dimensions": [
"page",
"user"
],
"metrics": [
"added"
],
"filter": {
"type": "selector",
"dimension": "page",
"value": "Druid"
"countryName",
"page"
]
},
"metricsSpec": [
{
"type": "count",
"name": "cnt"
}
],
"granularitySpec": {
"type": "uniform",
"queryGranularity": "HOUR",
"segmentGranularity": "DAY",
"intervals": ["2016-06-27/P1D"],
"rollup": true
}
...
},
...
"ioConfig": {
"type": "index_parallel",
"inputSource": {
"type": "druid",
"dataSource": "wikipedia_raw",
"interval": "2016-06-27/P1D"
}
},
"tuningConfig": {
"type": "index_parallel",
"partitionsSpec": {
"type": "hashed"
},
"forceGuaranteedRollup": true,
"maxNumConcurrentSubTasks": 1
}
}
}
```
This spec above will only return the `page`, `user` dimensions and `added` metric.
Only rows where `page` = `Druid` will be returned.
> Note: Older versions (0.19 and earlier) did not respect the timestampSpec when using the Druid input source. If you
> have ingestion specs that rely on this and cannot rewrite them, set
> [`druid.indexer.task.ignoreTimestampSpecForDruidInputSource`](../configuration/index.md#indexer-general-configuration)
> to `true` to enable a compatibility mode where the timestampSpec is ignored.
### SQL Input Source

View File

@ -39,6 +39,7 @@ import com.google.inject.Binder;
import com.google.inject.Guice;
import com.google.inject.Injector;
import com.google.inject.Provides;
import org.apache.druid.data.input.ColumnsFilter;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.InputRowSchema;
import org.apache.druid.data.input.InputSourceReader;
@ -111,7 +112,8 @@ public class OssInputSourceTest extends InitializedNullHandlingTest
private static final OssClientConfig CLOUD_CONFIG_PROPERTIES = new OssClientConfig(
"test.oss-cn.aliyun.com",
new DefaultPasswordProvider("myKey"),
new DefaultPasswordProvider("mySecret"));
new DefaultPasswordProvider("mySecret")
);
private static final List<CloudObjectLocation> EXPECTED_LOCATION =
ImmutableList.of(new CloudObjectLocation("foo", "bar/file.csv"));
@ -454,7 +456,7 @@ public class OssInputSourceTest extends InitializedNullHandlingTest
InputRowSchema someSchema = new InputRowSchema(
new TimestampSpec("time", "auto", null),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("dim1", "dim2"))),
ImmutableList.of("count")
ColumnsFilter.all()
);
InputSourceReader reader = inputSource.reader(
@ -497,7 +499,7 @@ public class OssInputSourceTest extends InitializedNullHandlingTest
InputRowSchema someSchema = new InputRowSchema(
new TimestampSpec("time", "auto", null),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("dim1", "dim2"))),
ImmutableList.of("count")
ColumnsFilter.all()
);
InputSourceReader reader = inputSource.reader(

View File

@ -27,6 +27,7 @@ import com.google.common.collect.Iterables;
import org.apache.avro.generic.GenericRecord;
import org.apache.druid.data.input.AvroHadoopInputRowParserTest;
import org.apache.druid.data.input.AvroStreamInputRowParserTest;
import org.apache.druid.data.input.ColumnsFilter;
import org.apache.druid.data.input.InputEntityReader;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.InputRowListPlusRawValues;
@ -199,10 +200,9 @@ public class AvroOCFReaderTest
final TimestampSpec timestampSpec = new TimestampSpec("timestamp", "auto", null);
final DimensionsSpec dimensionsSpec = new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of(
"eventType")));
final List<String> metricNames = ImmutableList.of("someLong");
final AvroOCFInputFormat inputFormat = new AvroOCFInputFormat(mapper, null, readerSchema, null);
final InputRowSchema schema = new InputRowSchema(timestampSpec, dimensionsSpec, metricNames);
final InputRowSchema schema = new InputRowSchema(timestampSpec, dimensionsSpec, ColumnsFilter.all());
final FileEntity entity = new FileEntity(someAvroFile);
return inputFormat.createReader(schema, entity, temporaryFolder.newFolder());
}

View File

@ -31,6 +31,7 @@ import com.google.inject.Binder;
import com.google.inject.Guice;
import com.google.inject.Injector;
import com.google.inject.Provides;
import org.apache.druid.data.input.ColumnsFilter;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.InputRowSchema;
import org.apache.druid.data.input.InputSourceReader;
@ -226,7 +227,7 @@ public class GoogleCloudStorageInputSourceTest extends InitializedNullHandlingTe
InputRowSchema someSchema = new InputRowSchema(
new TimestampSpec("time", "auto", null),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("dim1", "dim2"))),
ImmutableList.of("count")
ColumnsFilter.all()
);
InputSourceReader reader = inputSource.reader(
@ -269,7 +270,7 @@ public class GoogleCloudStorageInputSourceTest extends InitializedNullHandlingTe
InputRowSchema someSchema = new InputRowSchema(
new TimestampSpec("time", "auto", null),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("dim1", "dim2"))),
ImmutableList.of("count")
ColumnsFilter.all()
);
InputSourceReader reader = inputSource.reader(

View File

@ -24,6 +24,7 @@ import com.fasterxml.jackson.databind.InjectableValues.Std;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import org.apache.druid.data.input.ColumnsFilter;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.InputRowSchema;
@ -76,7 +77,7 @@ public class HdfsInputSourceTest extends InitializedNullHandlingTest
private static final InputRowSchema INPUT_ROW_SCHEMA = new InputRowSchema(
new TimestampSpec(null, null, null),
DimensionsSpec.EMPTY,
Collections.emptyList()
ColumnsFilter.all()
);
private static final InputFormat INPUT_FORMAT = new CsvInputFormat(
Arrays.asList(TimestampSpec.DEFAULT_COLUMN, COLUMN),

View File

@ -2781,7 +2781,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
true,
null,
null,
null
null,
false
);
final TestDerbyConnector derbyConnector = derby.getConnector();
derbyConnector.createDataSourceTable();

View File

@ -2867,7 +2867,8 @@ public class KinesisIndexTaskTest extends SeekableStreamIndexTaskTestBase
true,
null,
null,
null
null,
false
);
final TestDerbyConnector derbyConnector = derby.getConnector();
derbyConnector.createDataSourceTable();

View File

@ -21,6 +21,7 @@ package org.apache.druid.data.input.orc;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import org.apache.druid.data.input.ColumnsFilter;
import org.apache.druid.data.input.InputEntityReader;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.data.input.InputRow;
@ -259,7 +260,7 @@ public class OrcReaderTest
String dataFile
) throws IOException
{
final InputRowSchema schema = new InputRowSchema(timestampSpec, dimensionsSpec, Collections.emptyList());
final InputRowSchema schema = new InputRowSchema(timestampSpec, dimensionsSpec, ColumnsFilter.all());
final FileEntity entity = new FileEntity(new File(dataFile));
return inputFormat.createReader(schema, entity, temporaryFolder.newFolder());
}

View File

@ -20,6 +20,7 @@
package org.apache.druid.data.input.parquet;
import com.google.common.collect.ImmutableList;
import org.apache.druid.data.input.ColumnsFilter;
import org.apache.druid.data.input.InputEntityReader;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.InputRowListPlusRawValues;
@ -34,7 +35,6 @@ import org.junit.Assert;
import org.junit.Test;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
/**
@ -49,7 +49,7 @@ public class CompatParquetReaderTest extends BaseParquetReaderTest
InputRowSchema schema = new InputRowSchema(
new TimestampSpec("ts", "auto", null),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("field"))),
ImmutableList.of()
ColumnsFilter.all()
);
InputEntityReader reader = createReader(
file,
@ -114,7 +114,7 @@ public class CompatParquetReaderTest extends BaseParquetReaderTest
InputRowSchema schema = new InputRowSchema(
new TimestampSpec("timestamp", "auto", DateTimes.of("2018-09-01T00:00:00.000Z")),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of())),
ImmutableList.of("metric1")
ColumnsFilter.all()
);
List<JSONPathFieldSpec> flattenExpr = ImmutableList.of(
new JSONPathFieldSpec(JSONPathFieldType.ROOT, "col", "col"),
@ -200,7 +200,7 @@ public class CompatParquetReaderTest extends BaseParquetReaderTest
InputRowSchema schema = new InputRowSchema(
new TimestampSpec("timestamp", "auto", DateTimes.of("2018-09-01T00:00:00.000Z")),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of())),
Collections.emptyList()
ColumnsFilter.all()
);
List<JSONPathFieldSpec> flattenExpr = ImmutableList.of(
new JSONPathFieldSpec(JSONPathFieldType.PATH, "extractByLogicalMap", "$.intToStringColumn.1"),
@ -315,7 +315,7 @@ public class CompatParquetReaderTest extends BaseParquetReaderTest
InputRowSchema schema = new InputRowSchema(
new TimestampSpec("timestamp", "auto", DateTimes.of("2018-09-01T00:00:00.000Z")),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("repeatedInt"))),
Collections.emptyList()
ColumnsFilter.all()
);
List<JSONPathFieldSpec> flattenExpr = ImmutableList.of(
new JSONPathFieldSpec(JSONPathFieldType.ROOT, "repeatedInt", "repeatedInt")
@ -353,7 +353,7 @@ public class CompatParquetReaderTest extends BaseParquetReaderTest
InputRowSchema schema = new InputRowSchema(
new TimestampSpec("timestamp", "auto", DateTimes.of("2018-09-01T00:00:00.000Z")),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("i32_dec", "extracted1", "extracted2"))),
Collections.emptyList()
ColumnsFilter.all()
);
List<JSONPathFieldSpec> flattenExpr = ImmutableList.of(
new JSONPathFieldSpec(JSONPathFieldType.PATH, "extracted1", "$.myComplex[0].id"),
@ -395,7 +395,7 @@ public class CompatParquetReaderTest extends BaseParquetReaderTest
InputRowSchema schema = new InputRowSchema(
new TimestampSpec("timestamp", "auto", DateTimes.of("2018-09-01T00:00:00.000Z")),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of())),
Collections.emptyList()
ColumnsFilter.all()
);
List<JSONPathFieldSpec> flattenExpr = ImmutableList.of(
new JSONPathFieldSpec(JSONPathFieldType.PATH, "extractedOptional", "$.optionalMessage.someId"),

View File

@ -20,6 +20,7 @@
package org.apache.druid.data.input.parquet;
import com.google.common.collect.ImmutableList;
import org.apache.druid.data.input.ColumnsFilter;
import org.apache.druid.data.input.InputEntityReader;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.InputRowListPlusRawValues;
@ -49,7 +50,7 @@ public class DecimalParquetReaderTest extends BaseParquetReaderTest
InputRowSchema schema = new InputRowSchema(
new TimestampSpec("timestamp", "auto", DateTimes.of("2018-09-01T00:00:00.000Z")),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("fixed_len_dec"))),
ImmutableList.of("metric1")
ColumnsFilter.all()
);
List<JSONPathFieldSpec> flattenExpr = ImmutableList.of(
new JSONPathFieldSpec(JSONPathFieldType.ROOT, "fixed_len_dec", "fixed_len_dec"),
@ -86,7 +87,7 @@ public class DecimalParquetReaderTest extends BaseParquetReaderTest
InputRowSchema schema = new InputRowSchema(
new TimestampSpec("timestamp", "auto", DateTimes.of("2018-09-01T00:00:00.000Z")),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("i32_dec"))),
ImmutableList.of("metric1")
ColumnsFilter.all()
);
List<JSONPathFieldSpec> flattenExpr = ImmutableList.of(
new JSONPathFieldSpec(JSONPathFieldType.ROOT, "i32_dec", "i32_dec"),
@ -123,7 +124,7 @@ public class DecimalParquetReaderTest extends BaseParquetReaderTest
InputRowSchema schema = new InputRowSchema(
new TimestampSpec("timestamp", "auto", DateTimes.of("2018-09-01T00:00:00.000Z")),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("i64_dec"))),
ImmutableList.of("metric1")
ColumnsFilter.all()
);
List<JSONPathFieldSpec> flattenExpr = ImmutableList.of(
new JSONPathFieldSpec(JSONPathFieldType.ROOT, "i32_dec", "i64_dec"),

View File

@ -20,6 +20,7 @@
package org.apache.druid.data.input.parquet;
import com.google.common.collect.ImmutableList;
import org.apache.druid.data.input.ColumnsFilter;
import org.apache.druid.data.input.InputEntityReader;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.InputRowListPlusRawValues;
@ -33,7 +34,6 @@ import org.junit.Assert;
import org.junit.Test;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
/**
@ -69,7 +69,7 @@ public class FlattenSpecParquetReaderTest extends BaseParquetReaderTest
InputRowSchema schema = new InputRowSchema(
new TimestampSpec("timestamp", "auto", null),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("dim1", "dim2", "dim3", "listDim"))),
ImmutableList.of("metric1", "metric2")
ColumnsFilter.all()
);
JSONPathSpec flattenSpec = new JSONPathSpec(false, ImmutableList.of());
InputEntityReader reader = createReader(
@ -103,7 +103,7 @@ public class FlattenSpecParquetReaderTest extends BaseParquetReaderTest
InputRowSchema schema = new InputRowSchema(
new TimestampSpec("timestamp", "auto", null),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of())),
ImmutableList.of("metric1", "metric2")
ColumnsFilter.all()
);
InputEntityReader reader = createReader(
file,
@ -136,7 +136,7 @@ public class FlattenSpecParquetReaderTest extends BaseParquetReaderTest
InputRowSchema schema = new InputRowSchema(
new TimestampSpec("timestamp", "auto", null),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("dim1", "dim2", "dim3", "list"))),
ImmutableList.of("metric1", "metric2")
ColumnsFilter.all()
);
List<JSONPathFieldSpec> flattenExpr = ImmutableList.of(
new JSONPathFieldSpec(JSONPathFieldType.ROOT, "timestamp", null),
@ -177,7 +177,7 @@ public class FlattenSpecParquetReaderTest extends BaseParquetReaderTest
InputRowSchema schema = new InputRowSchema(
new TimestampSpec("timestamp", "auto", null),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("dim1", "dim2", "listExtracted"))),
ImmutableList.of("metric1", "metric2")
ColumnsFilter.all()
);
List<JSONPathFieldSpec> flattenExpr = ImmutableList.of(
new JSONPathFieldSpec(JSONPathFieldType.ROOT, "timestamp", null),
@ -217,7 +217,7 @@ public class FlattenSpecParquetReaderTest extends BaseParquetReaderTest
InputRowSchema schema = new InputRowSchema(
new TimestampSpec("timestamp", "auto", null),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("dim1"))),
ImmutableList.of("metric1")
ColumnsFilter.all()
);
JSONPathSpec flattenSpec = new JSONPathSpec(false, ImmutableList.of());
InputEntityReader reader = createReader(
@ -253,7 +253,7 @@ public class FlattenSpecParquetReaderTest extends BaseParquetReaderTest
InputRowSchema schema = new InputRowSchema(
new TimestampSpec("timestamp", "auto", null),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of())),
ImmutableList.of("metric1", "metric2")
ColumnsFilter.all()
);
InputEntityReader reader = createReader(
file,
@ -286,7 +286,7 @@ public class FlattenSpecParquetReaderTest extends BaseParquetReaderTest
InputRowSchema schema = new InputRowSchema(
new TimestampSpec("timestamp", "auto", null),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of())),
ImmutableList.of("metric1", "metric2")
ColumnsFilter.all()
);
List<JSONPathFieldSpec> flattenExpr = ImmutableList.of(
new JSONPathFieldSpec(JSONPathFieldType.ROOT, "timestamp", null),
@ -329,7 +329,7 @@ public class FlattenSpecParquetReaderTest extends BaseParquetReaderTest
InputRowSchema schema = new InputRowSchema(
new TimestampSpec("timestamp", "auto", null),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of())),
Collections.emptyList()
ColumnsFilter.all()
);
List<JSONPathFieldSpec> flattenExpr = ImmutableList.of(
new JSONPathFieldSpec(JSONPathFieldType.ROOT, "timestamp", null),

View File

@ -20,6 +20,7 @@
package org.apache.druid.data.input.parquet;
import com.google.common.collect.ImmutableList;
import org.apache.druid.data.input.ColumnsFilter;
import org.apache.druid.data.input.InputEntityReader;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.InputRowSchema;
@ -39,7 +40,6 @@ import org.junit.rules.TemporaryFolder;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.util.Collections;
import java.util.Objects;
public class ParquetReaderResourceLeakTest extends BaseParquetReaderTest
@ -55,7 +55,7 @@ public class ParquetReaderResourceLeakTest extends BaseParquetReaderTest
new DimensionsSpec(
DimensionsSpec.getDefaultSchemas(ImmutableList.of("page", "language", "user", "unpatrolled"))
),
Collections.emptyList()
ColumnsFilter.all()
);
FetchingFileEntity entity = new FetchingFileEntity(new File("example/wiki/wiki.parquet"));
ParquetInputFormat parquet = new ParquetInputFormat(JSONPathSpec.DEFAULT, false, new Configuration());

View File

@ -20,6 +20,7 @@
package org.apache.druid.data.input.parquet;
import com.google.common.collect.ImmutableList;
import org.apache.druid.data.input.ColumnsFilter;
import org.apache.druid.data.input.InputEntityReader;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.InputRowListPlusRawValues;
@ -31,7 +32,6 @@ import org.junit.Assert;
import org.junit.Test;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
/**
@ -46,12 +46,12 @@ public class TimestampsParquetReaderTest extends BaseParquetReaderTest
InputRowSchema schemaAsString = new InputRowSchema(
new TimestampSpec("date_as_string", "Y-M-d", null),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of())),
Collections.emptyList()
ColumnsFilter.all()
);
InputRowSchema schemaAsDate = new InputRowSchema(
new TimestampSpec("date_as_date", null, null),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of())),
Collections.emptyList()
ColumnsFilter.all()
);
InputEntityReader readerAsString = createReader(
file,
@ -104,7 +104,7 @@ public class TimestampsParquetReaderTest extends BaseParquetReaderTest
InputRowSchema schema = new InputRowSchema(
new TimestampSpec("ts", "auto", null),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of())),
Collections.emptyList()
ColumnsFilter.all()
);
InputEntityReader reader = createReader(file, schema, JSONPathSpec.DEFAULT);
@ -130,7 +130,7 @@ public class TimestampsParquetReaderTest extends BaseParquetReaderTest
InputRowSchema schema = new InputRowSchema(
new TimestampSpec("time", "auto", null),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of())),
Collections.emptyList()
ColumnsFilter.all()
);
InputEntityReader reader = createReader(
file,

View File

@ -20,6 +20,7 @@
package org.apache.druid.data.input.parquet;
import com.google.common.collect.ImmutableList;
import org.apache.druid.data.input.ColumnsFilter;
import org.apache.druid.data.input.InputEntityReader;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.InputRowListPlusRawValues;
@ -31,7 +32,6 @@ import org.junit.Assert;
import org.junit.Test;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
/**
@ -45,7 +45,7 @@ public class WikiParquetReaderTest extends BaseParquetReaderTest
InputRowSchema schema = new InputRowSchema(
new TimestampSpec("timestamp", "iso", null),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("page", "language", "user", "unpatrolled"))),
Collections.emptyList()
ColumnsFilter.all()
);
InputEntityReader reader = createReader("example/wiki/wiki.parquet", schema, JSONPathSpec.DEFAULT);

View File

@ -40,6 +40,7 @@ import com.google.inject.Binder;
import com.google.inject.Guice;
import com.google.inject.Injector;
import com.google.inject.Provides;
import org.apache.druid.data.input.ColumnsFilter;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.InputRowSchema;
import org.apache.druid.data.input.InputSourceReader;
@ -509,7 +510,7 @@ public class S3InputSourceTest extends InitializedNullHandlingTest
InputRowSchema someSchema = new InputRowSchema(
new TimestampSpec("time", "auto", null),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("dim1", "dim2"))),
ImmutableList.of("count")
ColumnsFilter.all()
);
InputSourceReader reader = inputSource.reader(
@ -553,7 +554,7 @@ public class S3InputSourceTest extends InitializedNullHandlingTest
InputRowSchema someSchema = new InputRowSchema(
new TimestampSpec("time", "auto", null),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("dim1", "dim2"))),
ImmutableList.of("count")
ColumnsFilter.all()
);
InputSourceReader reader = inputSource.reader(

View File

@ -34,6 +34,10 @@ import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
/**
* @deprecated only used by {@link org.apache.druid.indexing.firehose.IngestSegmentFirehoseFactory}
*/
@Deprecated
public class ReingestionTimelineUtils
{
/**

View File

@ -67,6 +67,9 @@ public class TaskConfig
@JsonProperty
private final List<StorageLocationConfig> shuffleDataLocations;
@JsonProperty
private final boolean ignoreTimestampSpecForDruidInputSource;
@JsonCreator
public TaskConfig(
@JsonProperty("baseDir") String baseDir,
@ -77,7 +80,8 @@ public class TaskConfig
@JsonProperty("restoreTasksOnRestart") boolean restoreTasksOnRestart,
@JsonProperty("gracefulShutdownTimeout") Period gracefulShutdownTimeout,
@JsonProperty("directoryLockTimeout") Period directoryLockTimeout,
@JsonProperty("shuffleDataLocations") List<StorageLocationConfig> shuffleDataLocations
@JsonProperty("shuffleDataLocations") List<StorageLocationConfig> shuffleDataLocations,
@JsonProperty("ignoreTimestampSpecForDruidInputSource") boolean ignoreTimestampSpecForDruidInputSource
)
{
this.baseDir = baseDir == null ? System.getProperty("java.io.tmpdir") : baseDir;
@ -102,6 +106,7 @@ public class TaskConfig
} else {
this.shuffleDataLocations = shuffleDataLocations;
}
this.ignoreTimestampSpecForDruidInputSource = ignoreTimestampSpecForDruidInputSource;
}
@JsonProperty
@ -178,6 +183,12 @@ public class TaskConfig
return shuffleDataLocations;
}
@JsonProperty
public boolean isIgnoreTimestampSpecForDruidInputSource()
{
return ignoreTimestampSpecForDruidInputSource;
}
private String defaultDir(@Nullable String configParameter, final String defaultVal)
{
if (configParameter == null) {

View File

@ -26,7 +26,6 @@ import com.google.errorprone.annotations.concurrent.GuardedBy;
import org.apache.druid.data.input.FirehoseFactory;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.InputRowSchema;
import org.apache.druid.data.input.InputSource;
import org.apache.druid.data.input.InputSourceReader;
import org.apache.druid.indexer.TaskStatus;
@ -42,6 +41,7 @@ import org.apache.druid.indexing.common.task.IndexTask.IndexIOConfig;
import org.apache.druid.indexing.common.task.IndexTask.IndexTuningConfig;
import org.apache.druid.indexing.firehose.IngestSegmentFirehoseFactory;
import org.apache.druid.indexing.firehose.WindowedSegmentId;
import org.apache.druid.indexing.input.InputRowSchemas;
import org.apache.druid.indexing.overlord.Segments;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.JodaUtils;
@ -49,7 +49,6 @@ import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.granularity.GranularityType;
import org.apache.druid.java.util.common.granularity.IntervalsByGranularity;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.segment.incremental.ParseExceptionHandler;
import org.apache.druid.segment.incremental.RowIngestionMeters;
import org.apache.druid.segment.indexing.DataSchema;
@ -66,7 +65,6 @@ import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
@ -176,16 +174,9 @@ public abstract class AbstractBatchIndexTask extends AbstractTask
ParseExceptionHandler parseExceptionHandler
) throws IOException
{
final List<String> metricsNames = Arrays.stream(dataSchema.getAggregators())
.map(AggregatorFactory::getName)
.collect(Collectors.toList());
final InputSourceReader inputSourceReader = dataSchema.getTransformSpec().decorate(
inputSource.reader(
new InputRowSchema(
dataSchema.getTimestampSpec(),
dataSchema.getDimensionsSpec(),
metricsNames
),
InputRowSchemas.fromDataSchema(dataSchema),
inputFormat,
tmpDir
)

View File

@ -624,12 +624,13 @@ public class CompactionTask extends AbstractBatchIndexTask
interval,
null,
null,
dataSchema.getDimensionsSpec().getDimensionNames(),
Arrays.stream(dataSchema.getAggregators()).map(AggregatorFactory::getName).collect(Collectors.toList()),
null,
null,
toolbox.getIndexIO(),
coordinatorClient,
segmentLoaderFactory,
retryPolicyFactory
retryPolicyFactory,
toolbox.getConfig()
),
null,
false
@ -699,7 +700,7 @@ public class CompactionTask extends AbstractBatchIndexTask
return new
DataSchema(
dataSource,
new TimestampSpec(null, null, null),
new TimestampSpec(ColumnHolder.TIME_COLUMN_NAME, "millis", null),
finalDimensionsSpec,
finalMetricsSpec,
uniformGranularitySpec,

View File

@ -76,6 +76,7 @@ public class InputSourceProcessor
? (DynamicPartitionsSpec) partitionsSpec
: null;
final GranularitySpec granularitySpec = dataSchema.getGranularitySpec();
try (
final CloseableIterator<InputRow> inputRowIterator = AbstractBatchIndexTask.inputSourceReader(
tmpDir,

View File

@ -62,6 +62,10 @@ import java.util.List;
import java.util.Map;
import java.util.stream.Stream;
/**
* @deprecated use {@link DruidInputSource} instead
*/
@Deprecated
public class IngestSegmentFirehoseFactory implements FiniteFirehoseFactory<InputRowParser, List<WindowedSegmentId>>
{
private static final EmittingLogger log = new EmittingLogger(IngestSegmentFirehoseFactory.class);

View File

@ -22,10 +22,10 @@ package org.apache.druid.indexing.input;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonInclude.Include;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterators;
import org.apache.druid.client.coordinator.CoordinatorClient;
import org.apache.druid.data.input.AbstractInputSource;
@ -39,10 +39,11 @@ import org.apache.druid.data.input.SegmentsSplitHintSpec;
import org.apache.druid.data.input.SplitHintSpec;
import org.apache.druid.data.input.impl.InputEntityIteratingReader;
import org.apache.druid.data.input.impl.SplittableInputSource;
import org.apache.druid.indexing.common.ReingestionTimelineUtils;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.indexing.common.RetryPolicy;
import org.apache.druid.indexing.common.RetryPolicyFactory;
import org.apache.druid.indexing.common.SegmentLoaderFactory;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.firehose.WindowedSegmentId;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
@ -50,6 +51,7 @@ import org.apache.druid.java.util.common.guava.Comparators;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.query.filter.DimFilter;
import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.segment.loading.SegmentLoader;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.TimelineObjectHolder;
@ -70,15 +72,27 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Stream;
/**
* An {@link org.apache.druid.data.input.InputSource} that allows reading from Druid segments.
*
* Used internally by {@link org.apache.druid.indexing.common.task.CompactionTask}, and can also be used directly.
*/
@JsonInclude(JsonInclude.Include.NON_NULL)
public class DruidInputSource extends AbstractInputSource implements SplittableInputSource<List<WindowedSegmentId>>
{
private static final Logger LOG = new Logger(DruidInputSource.class);
/**
* Timestamp formats that the standard __time column can be parsed with.
*/
private static final List<String> STANDARD_TIME_COLUMN_FORMATS = ImmutableList.of("millis", "auto");
/**
* A Comparator that orders {@link WindowedSegmentId} mainly by segmentId (which is important), and then by intervals
* (which is arbitrary, and only here for totality of ordering).
@ -113,12 +127,21 @@ public class DruidInputSource extends AbstractInputSource implements SplittableI
@Nullable
private final List<WindowedSegmentId> segmentIds;
private final DimFilter dimFilter;
private final List<String> dimensions;
private final List<String> metrics;
private final IndexIO indexIO;
private final CoordinatorClient coordinatorClient;
private final SegmentLoaderFactory segmentLoaderFactory;
private final RetryPolicyFactory retryPolicyFactory;
private final TaskConfig taskConfig;
/**
* Included for serde backwards-compatibility only. Not used.
*/
private final List<String> dimensions;
/**
* Included for serde backwards-compatibility only. Not used.
*/
private final List<String> metrics;
@JsonCreator
public DruidInputSource(
@ -133,7 +156,8 @@ public class DruidInputSource extends AbstractInputSource implements SplittableI
@JacksonInject IndexIO indexIO,
@JacksonInject CoordinatorClient coordinatorClient,
@JacksonInject SegmentLoaderFactory segmentLoaderFactory,
@JacksonInject RetryPolicyFactory retryPolicyFactory
@JacksonInject RetryPolicyFactory retryPolicyFactory,
@JacksonInject TaskConfig taskConfig
)
{
Preconditions.checkNotNull(dataSource, "dataSource");
@ -150,6 +174,7 @@ public class DruidInputSource extends AbstractInputSource implements SplittableI
this.coordinatorClient = Preconditions.checkNotNull(coordinatorClient, "null CoordinatorClient");
this.segmentLoaderFactory = Preconditions.checkNotNull(segmentLoaderFactory, "null SegmentLoaderFactory");
this.retryPolicyFactory = Preconditions.checkNotNull(retryPolicyFactory, "null RetryPolicyFactory");
this.taskConfig = Preconditions.checkNotNull(taskConfig, "null taskConfig");
}
@JsonProperty
@ -167,7 +192,6 @@ public class DruidInputSource extends AbstractInputSource implements SplittableI
@Nullable
@JsonProperty("segments")
@JsonInclude(Include.NON_NULL)
public List<WindowedSegmentId> getSegmentIds()
{
return segmentIds;
@ -179,12 +203,18 @@ public class DruidInputSource extends AbstractInputSource implements SplittableI
return dimFilter;
}
/**
* Included for serde backwards-compatibility only. Not used.
*/
@JsonProperty
public List<String> getDimensions()
{
return dimensions;
}
/**
* Included for serde backwards-compatibility only. Not used.
*/
@JsonProperty
public List<String> getMetrics()
{
@ -207,28 +237,38 @@ public class DruidInputSource extends AbstractInputSource implements SplittableI
.from(partitionHolder)
.transform(chunk -> new DruidSegmentInputEntity(segmentLoader, chunk.getObject(), holder.getInterval()));
}).iterator();
final List<String> effectiveDimensions = ReingestionTimelineUtils.getDimensionsToReingest(
dimensions,
inputRowSchema.getDimensionsSpec(),
timeline
);
List<String> effectiveMetrics;
if (metrics == null) {
effectiveMetrics = ReingestionTimelineUtils.getUniqueMetrics(timeline);
final DruidSegmentInputFormat inputFormat = new DruidSegmentInputFormat(indexIO, dimFilter);
final InputRowSchema inputRowSchemaToUse;
if (taskConfig.isIgnoreTimestampSpecForDruidInputSource()) {
// Legacy compatibility mode; see https://github.com/apache/druid/pull/10267.
LOG.warn("Ignoring the provided timestampSpec and reading the __time column instead. To use timestampSpecs with "
+ "the 'druid' input source, set druid.indexer.task.ignoreTimestampSpecForDruidInputSource to false.");
inputRowSchemaToUse = new InputRowSchema(
new TimestampSpec(ColumnHolder.TIME_COLUMN_NAME, STANDARD_TIME_COLUMN_FORMATS.iterator().next(), null),
inputRowSchema.getDimensionsSpec(),
inputRowSchema.getColumnsFilter().plus(ColumnHolder.TIME_COLUMN_NAME)
);
} else {
effectiveMetrics = metrics;
inputRowSchemaToUse = inputRowSchema;
}
final DruidSegmentInputFormat inputFormat = new DruidSegmentInputFormat(
indexIO,
dimFilter,
effectiveDimensions,
effectiveMetrics
);
if (ColumnHolder.TIME_COLUMN_NAME.equals(inputRowSchemaToUse.getTimestampSpec().getTimestampColumn())
&& !STANDARD_TIME_COLUMN_FORMATS.contains(inputRowSchemaToUse.getTimestampSpec().getTimestampFormat())) {
// Slight chance the user did this intentionally, but not likely. Log a warning.
LOG.warn(
"The provided timestampSpec refers to the %s column without using format %s. If you wanted to read the "
+ "column as-is, switch formats.",
inputRowSchemaToUse.getTimestampSpec().getTimestampColumn(),
STANDARD_TIME_COLUMN_FORMATS
);
}
return new InputEntityIteratingReader(
inputRowSchema,
inputRowSchemaToUse,
inputFormat,
entityIterator,
temporaryDirectory
@ -300,7 +340,8 @@ public class DruidInputSource extends AbstractInputSource implements SplittableI
indexIO,
coordinatorClient,
segmentLoaderFactory,
retryPolicyFactory
retryPolicyFactory,
taskConfig
);
}
@ -310,6 +351,43 @@ public class DruidInputSource extends AbstractInputSource implements SplittableI
return false;
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
DruidInputSource that = (DruidInputSource) o;
return Objects.equals(dataSource, that.dataSource)
&& Objects.equals(interval, that.interval)
&& Objects.equals(segmentIds, that.segmentIds)
&& Objects.equals(dimFilter, that.dimFilter)
&& Objects.equals(dimensions, that.dimensions)
&& Objects.equals(metrics, that.metrics);
}
@Override
public int hashCode()
{
return Objects.hash(dataSource, interval, segmentIds, dimFilter, dimensions, metrics);
}
@Override
public String toString()
{
return "DruidInputSource{" +
"dataSource='" + dataSource + '\'' +
", interval=" + interval +
", segmentIds=" + segmentIds +
", dimFilter=" + dimFilter +
(dimensions != null ? ", dimensions=" + dimensions : "") +
(metrics != null ? ", metrics=" + metrics : "") +
'}';
}
public static Iterator<InputSplit<List<WindowedSegmentId>>> createSplits(
CoordinatorClient coordinatorClient,
RetryPolicyFactory retryPolicyFactory,

View File

@ -27,26 +27,19 @@ import org.apache.druid.query.filter.DimFilter;
import org.apache.druid.segment.IndexIO;
import java.io.File;
import java.util.List;
public class DruidSegmentInputFormat implements InputFormat
{
private final IndexIO indexIO;
private final DimFilter dimFilter;
private List<String> dimensions;
private List<String> metrics;
DruidSegmentInputFormat(
public DruidSegmentInputFormat(
IndexIO indexIO,
DimFilter dimFilter,
List<String> dimensions,
List<String> metrics
DimFilter dimFilter
)
{
this.indexIO = indexIO;
this.dimFilter = dimFilter;
this.dimensions = dimensions;
this.metrics = metrics;
}
@Override
@ -65,8 +58,9 @@ public class DruidSegmentInputFormat implements InputFormat
return new DruidSegmentReader(
source,
indexIO,
dimensions,
metrics,
inputRowSchema.getTimestampSpec(),
inputRowSchema.getDimensionsSpec(),
inputRowSchema.getColumnsFilter(),
dimFilter,
temporaryDirectory
);

View File

@ -21,12 +21,18 @@ package org.apache.druid.indexing.input;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
import org.apache.druid.data.input.ColumnsFilter;
import org.apache.druid.data.input.InputEntity;
import org.apache.druid.data.input.InputEntity.CleanableFile;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.InputRowSchema;
import org.apache.druid.data.input.IntermediateRowParsingReader;
import org.apache.druid.data.input.MapBasedInputRow;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.MapInputRowParser;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
@ -35,56 +41,64 @@ import org.apache.druid.java.util.common.guava.Yielders;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.java.util.common.parsers.ParseException;
import org.apache.druid.query.dimension.DefaultDimensionSpec;
import org.apache.druid.query.filter.DimFilter;
import org.apache.druid.segment.BaseDoubleColumnValueSelector;
import org.apache.druid.segment.BaseFloatColumnValueSelector;
import org.apache.druid.segment.BaseLongColumnValueSelector;
import org.apache.druid.segment.BaseObjectColumnValueSelector;
import org.apache.druid.segment.ColumnProcessorFactory;
import org.apache.druid.segment.ColumnProcessors;
import org.apache.druid.segment.Cursor;
import org.apache.druid.segment.DimensionSelector;
import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.QueryableIndexStorageAdapter;
import org.apache.druid.segment.VirtualColumns;
import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.segment.data.IndexedInts;
import org.apache.druid.segment.filter.Filters;
import org.apache.druid.segment.realtime.firehose.WindowedStorageAdapter;
import org.apache.druid.utils.CollectionUtils;
import org.joda.time.DateTime;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.NoSuchElementException;
import java.util.Set;
public class DruidSegmentReader extends IntermediateRowParsingReader<Map<String, Object>>
{
private final DruidSegmentInputEntity source;
private final IndexIO indexIO;
private final List<String> dimensions;
private final List<String> metrics;
private final ColumnsFilter columnsFilter;
private final InputRowSchema inputRowSchema;
private final DimFilter dimFilter;
private final File temporaryDirectory;
DruidSegmentReader(
InputEntity source,
IndexIO indexIO,
List<String> dimensions,
List<String> metrics,
DimFilter dimFilter,
File temporaryDirectory
final InputEntity source,
final IndexIO indexIO,
final TimestampSpec timestampSpec,
final DimensionsSpec dimensionsSpec,
final ColumnsFilter columnsFilter,
final DimFilter dimFilter,
final File temporaryDirectory
)
{
Preconditions.checkArgument(source instanceof DruidSegmentInputEntity);
this.source = (DruidSegmentInputEntity) source;
this.indexIO = indexIO;
this.dimensions = dimensions;
this.metrics = metrics;
this.columnsFilter = columnsFilter;
this.inputRowSchema = new InputRowSchema(
timestampSpec,
dimensionsSpec,
columnsFilter
);
this.dimFilter = dimFilter;
this.temporaryDirectory = temporaryDirectory;
}
@ -109,10 +123,23 @@ public class DruidSegmentReader extends IntermediateRowParsingReader<Map<String,
null
);
// Retain order of columns from the original segments. Useful for preserving dimension order if we're in
// schemaless mode.
final Set<String> columnsToRead = Sets.newLinkedHashSet(
Iterables.filter(
Iterables.concat(
Collections.singleton(ColumnHolder.TIME_COLUMN_NAME),
storageAdapter.getAdapter().getAvailableDimensions(),
storageAdapter.getAdapter().getAvailableMetrics()
),
columnsFilter::apply
)
);
final Sequence<Map<String, Object>> sequence = Sequences.concat(
Sequences.map(
cursors,
this::cursorToSequence
cursor -> cursorToSequence(cursor, columnsToRead)
)
);
@ -122,8 +149,7 @@ public class DruidSegmentReader extends IntermediateRowParsingReader<Map<String,
@Override
protected List<InputRow> parseInputRows(Map<String, Object> intermediateRow) throws ParseException
{
final DateTime timestamp = (DateTime) intermediateRow.get(ColumnHolder.TIME_COLUMN_NAME);
return Collections.singletonList(new MapBasedInputRow(timestamp.getMillis(), dimensions, intermediateRow));
return Collections.singletonList(MapInputRowParser.parse(inputRowSchema, intermediateRow));
}
@Override
@ -137,14 +163,13 @@ public class DruidSegmentReader extends IntermediateRowParsingReader<Map<String,
* Map<String, Object> intermediate rows, selecting the dimensions and metrics of this segment reader.
*
* @param cursor A cursor
*
* @return A sequence of intermediate rows
*/
private Sequence<Map<String, Object>> cursorToSequence(
final Cursor cursor
)
private Sequence<Map<String, Object>> cursorToSequence(final Cursor cursor, final Set<String> columnsToRead)
{
return Sequences.simple(
() -> new IntermediateRowFromCursorIterator(cursor, dimensions, metrics)
() -> new IntermediateRowFromCursorIterator(cursor, columnsToRead)
);
}
@ -152,8 +177,9 @@ public class DruidSegmentReader extends IntermediateRowParsingReader<Map<String,
* @param sequence A sequence of intermediate rows generated from a sequence of
* cursors in {@link #intermediateRowIterator()}
* @param segmentFile The underlying segment file containing the row data
*
* @return A CloseableIterator from a sequence of intermediate rows, closing the underlying segment file
* when the iterator is closed.
* when the iterator is closed.
*/
@VisibleForTesting
static CloseableIterator<Map<String, Object>> makeCloseableIteratorFromSequenceAndSegmentFile(
@ -190,6 +216,66 @@ public class DruidSegmentReader extends IntermediateRowParsingReader<Map<String,
};
}
/**
* Reads columns for {@link IntermediateRowFromCursorIterator}.
*/
private static class IntermediateRowColumnProcessorFactory implements ColumnProcessorFactory<Supplier<Object>>
{
private static final IntermediateRowColumnProcessorFactory INSTANCE = new IntermediateRowColumnProcessorFactory();
@Override
public ValueType defaultType()
{
return ValueType.STRING;
}
@Override
public Supplier<Object> makeDimensionProcessor(DimensionSelector selector, boolean multiValue)
{
return () -> {
final IndexedInts vals = selector.getRow();
int valsSize = vals.size();
if (valsSize == 1) {
return selector.lookupName(vals.get(0));
} else if (valsSize > 1) {
List<String> dimVals = new ArrayList<>(valsSize);
for (int i = 0; i < valsSize; ++i) {
dimVals.add(selector.lookupName(vals.get(i)));
}
return dimVals;
}
return null;
};
}
@Override
public Supplier<Object> makeFloatProcessor(BaseFloatColumnValueSelector selector)
{
return () -> selector.isNull() ? null : selector.getFloat();
}
@Override
public Supplier<Object> makeDoubleProcessor(BaseDoubleColumnValueSelector selector)
{
return () -> selector.isNull() ? null : selector.getDouble();
}
@Override
public Supplier<Object> makeLongProcessor(BaseLongColumnValueSelector selector)
{
return () -> selector.isNull() ? null : selector.getLong();
}
@Override
public Supplier<Object> makeComplexProcessor(BaseObjectColumnValueSelector<?> selector)
{
return selector::getObject;
}
}
/**
* Given a {@link Cursor}, a list of dimension names, and a list of metric names, this iterator
* returns the rows of the cursor as Map<String, Object> intermediate rows.
@ -197,39 +283,25 @@ public class DruidSegmentReader extends IntermediateRowParsingReader<Map<String,
private static class IntermediateRowFromCursorIterator implements Iterator<Map<String, Object>>
{
private final Cursor cursor;
private final BaseLongColumnValueSelector timestampColumnSelector;
private final Map<String, DimensionSelector> dimSelectors;
private final Map<String, BaseObjectColumnValueSelector> metSelectors;
private final Map<String, Supplier<Object>> columnReaders;
public IntermediateRowFromCursorIterator(
Cursor cursor,
List<String> dimensionNames,
List<String> metricNames
final Cursor cursor,
final Set<String> columnsToRead
)
{
this.cursor = cursor;
this.columnReaders = CollectionUtils.newLinkedHashMapWithExpectedSize(columnsToRead.size());
timestampColumnSelector = cursor
.getColumnSelectorFactory()
.makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME);
dimSelectors = new HashMap<>();
for (String dim : dimensionNames) {
final DimensionSelector dimSelector = cursor
.getColumnSelectorFactory()
.makeDimensionSelector(new DefaultDimensionSpec(dim, dim));
// dimSelector is null if the dimension is not present
if (dimSelector != null) {
dimSelectors.put(dim, dimSelector);
}
}
metSelectors = new HashMap<>();
for (String metric : metricNames) {
final BaseObjectColumnValueSelector metricSelector = cursor
.getColumnSelectorFactory()
.makeColumnValueSelector(metric);
metSelectors.put(metric, metricSelector);
for (String column : columnsToRead) {
columnReaders.put(
column,
ColumnProcessors.makeProcessor(
column,
IntermediateRowColumnProcessorFactory.INSTANCE,
cursor.getColumnSelectorFactory()
)
);
}
}
@ -245,46 +317,18 @@ public class DruidSegmentReader extends IntermediateRowParsingReader<Map<String,
if (!hasNext()) {
throw new NoSuchElementException();
}
final Map<String, Object> theEvent =
CollectionUtils.newLinkedHashMapWithExpectedSize(dimSelectors.size() + metSelectors.size() + 1);
final Map<String, Object> rowMap =
CollectionUtils.newLinkedHashMapWithExpectedSize(columnReaders.size());
for (Entry<String, DimensionSelector> dimSelector : dimSelectors.entrySet()) {
final String dim = dimSelector.getKey();
final DimensionSelector selector = dimSelector.getValue();
final IndexedInts vals = selector.getRow();
int valsSize = vals.size();
if (valsSize == 1) {
final String dimVal = selector.lookupName(vals.get(0));
theEvent.put(dim, dimVal);
} else if (valsSize > 1) {
List<String> dimVals = new ArrayList<>(valsSize);
for (int i = 0; i < valsSize; ++i) {
dimVals.add(selector.lookupName(vals.get(i)));
}
theEvent.put(dim, dimVals);
}
}
for (Entry<String, BaseObjectColumnValueSelector> metSelector : metSelectors.entrySet()) {
final String metric = metSelector.getKey();
final BaseObjectColumnValueSelector selector = metSelector.getValue();
Object value = selector.getObject();
for (Entry<String, Supplier<Object>> entry : columnReaders.entrySet()) {
final Object value = entry.getValue().get();
if (value != null) {
theEvent.put(metric, value);
rowMap.put(entry.getKey(), value);
}
}
// Timestamp is added last because we expect that the time column will always be a date time object.
// If it is added earlier, it can be overwritten by metrics or dimenstions with the same name.
//
// If a user names a metric or dimension `__time` it will be overwritten. This case should be rare since
// __time is reserved for the time column in druid segments.
final long timestamp = timestampColumnSelector.getLong();
theEvent.put(ColumnHolder.TIME_COLUMN_NAME, DateTimes.utc(timestamp));
cursor.advance();
return theEvent;
return rowMap;
}
}
}

View File

@ -0,0 +1,130 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.input;
import com.google.common.annotations.VisibleForTesting;
import org.apache.druid.data.input.ColumnsFilter;
import org.apache.druid.data.input.InputRowSchema;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.transform.Transform;
import org.apache.druid.segment.transform.TransformSpec;
import java.util.HashSet;
import java.util.Set;
import java.util.stream.Collectors;
/**
* Utilities that are helpful when implementing {@link org.apache.druid.data.input.InputEntityReader}.
*/
public class InputRowSchemas
{
private InputRowSchemas()
{
// No instantiation.
}
/**
* Creates an {@link InputRowSchema} from a given {@link DataSchema}.
*/
public static InputRowSchema fromDataSchema(final DataSchema dataSchema)
{
return new InputRowSchema(
dataSchema.getTimestampSpec(),
dataSchema.getDimensionsSpec(),
createColumnsFilter(
dataSchema.getTimestampSpec(),
dataSchema.getDimensionsSpec(),
dataSchema.getTransformSpec(),
dataSchema.getAggregators()
)
);
}
/**
* Build a {@link ColumnsFilter} that can filter down the list of columns that must be read after flattening.
*
* @see InputRowSchema#getColumnsFilter()
*/
@VisibleForTesting
static ColumnsFilter createColumnsFilter(
final TimestampSpec timestampSpec,
final DimensionsSpec dimensionsSpec,
final TransformSpec transformSpec,
final AggregatorFactory[] aggregators
)
{
// We'll need to know what fields are generated from transforms, vs. expected from the raw data.
final Set<String> transformOutputNames =
transformSpec.getTransforms().stream().map(Transform::getName).collect(Collectors.toSet());
if (dimensionsSpec.hasCustomDimensions()) {
// We need an inclusion-based filter.
final Set<String> inclusions = new HashSet<>();
// Add timestamp column.
inclusions.add(timestampSpec.getTimestampColumn());
// Add all transform inputs.
inclusions.addAll(transformSpec.getRequiredColumns());
// Add all dimension inputs that are *not* transform outputs.
for (String column : dimensionsSpec.getDimensionNames()) {
if (!transformOutputNames.contains(column)) {
inclusions.add(column);
}
}
// Add all aggregator inputs that are *not* transform outputs.
for (AggregatorFactory aggregator : aggregators) {
for (String column : aggregator.requiredFields()) {
if (!transformOutputNames.contains(column)) {
inclusions.add(column);
}
}
}
return ColumnsFilter.inclusionBased(inclusions);
} else {
// Schemaless dimensions mode: we need an exclusion-based filter.
// Start from the list of dimension exclusions.
final Set<String> exclusions = new HashSet<>(dimensionsSpec.getDimensionExclusions());
// Remove (un-exclude) timestamp column.
exclusions.remove(timestampSpec.getTimestampColumn());
// Remove (un-exclude) all transform inputs.
exclusions.removeAll(transformSpec.getRequiredColumns());
// Remove (un-exclude) all aggregator inputs that are *not* transform outputs.
for (AggregatorFactory aggregator : aggregators) {
for (String column : aggregator.requiredFields()) {
if (!transformOutputNames.contains(column)) {
exclusions.remove(column);
}
}
}
return ColumnsFilter.exclusionBased(exclusions);
}
}
}

View File

@ -33,6 +33,7 @@ import org.apache.druid.data.input.Row;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.TimedShutoffInputSourceReader;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.indexing.input.InputRowSchemas;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.FileUtils;
import org.apache.druid.java.util.common.io.Closer;
@ -51,8 +52,7 @@ import org.apache.druid.segment.indexing.DataSchema;
import javax.annotation.Nullable;
import java.io.File;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@ -175,10 +175,10 @@ public class InputSourceSampler
columnNames.remove(SamplerInputRow.SAMPLER_ORDERING_COLUMN);
for (Row row : index) {
Map<String, Object> parsed = new HashMap<>();
Map<String, Object> parsed = new LinkedHashMap<>();
columnNames.forEach(k -> parsed.put(k, row.getRaw(k)));
parsed.put(ColumnHolder.TIME_COLUMN_NAME, row.getTimestampFromEpoch());
columnNames.forEach(k -> parsed.put(k, row.getRaw(k)));
Number sortKey = row.getMetric(SamplerInputRow.SAMPLER_ORDERING_COLUMN);
if (sortKey != null) {
@ -215,14 +215,7 @@ public class InputSourceSampler
File tempDir
)
{
final List<String> metricsNames = Arrays.stream(dataSchema.getAggregators())
.map(AggregatorFactory::getName)
.collect(Collectors.toList());
final InputRowSchema inputRowSchema = new InputRowSchema(
dataSchema.getTimestampSpec(),
dataSchema.getDimensionsSpec(),
metricsNames
);
final InputRowSchema inputRowSchema = InputRowSchemas.fromDataSchema(dataSchema);
InputSourceReader reader = inputSource.reader(inputRowSchema, inputFormat, tempDir);

View File

@ -61,6 +61,7 @@ import org.apache.druid.indexing.common.actions.SegmentLockAcquireAction;
import org.apache.druid.indexing.common.actions.TimeChunkLockAcquireAction;
import org.apache.druid.indexing.common.task.IndexTaskUtils;
import org.apache.druid.indexing.common.task.RealtimeIndexTask;
import org.apache.druid.indexing.input.InputRowSchemas;
import org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord;
import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber;
import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
@ -70,7 +71,6 @@ import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.segment.incremental.ParseExceptionHandler;
import org.apache.druid.segment.incremental.RowIngestionMeters;
import org.apache.druid.segment.indexing.RealtimeIOConfig;
@ -106,7 +106,6 @@ import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@ -245,13 +244,7 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
this.task = task;
this.ioConfig = task.getIOConfig();
this.tuningConfig = task.getTuningConfig();
this.inputRowSchema = new InputRowSchema(
task.getDataSchema().getTimestampSpec(),
task.getDataSchema().getDimensionsSpec(),
Arrays.stream(task.getDataSchema().getAggregators())
.map(AggregatorFactory::getName)
.collect(Collectors.toList())
);
this.inputRowSchema = InputRowSchemas.fromDataSchema(task.getDataSchema());
this.inputFormat = ioConfig.getInputFormat();
this.parser = parser;
this.authorizerMapper = authorizerMapper;

View File

@ -102,7 +102,7 @@ public class TaskToolboxTest
EasyMock.replay(task, mockHandoffNotifierFactory);
taskToolbox = new TaskToolboxFactory(
new TaskConfig(temporaryFolder.newFile().toString(), null, null, 50000, null, false, null, null, null),
new TaskConfig(temporaryFolder.newFile().toString(), null, null, 50000, null, false, null, null, null, false),
new DruidNode("druid/middlemanager", "localhost", false, 8091, null, true, false),
mockTaskActionClientFactory,
mockEmitter,

View File

@ -1505,7 +1505,18 @@ public class AppenderatorDriverRealtimeIndexTaskTest extends InitializedNullHand
};
taskLockbox = new TaskLockbox(taskStorage, mdc);
final TaskConfig taskConfig = new TaskConfig(directory.getPath(), null, null, 50000, null, true, null, null, null);
final TaskConfig taskConfig = new TaskConfig(
directory.getPath(),
null,
null,
50000,
null,
true,
null,
null,
null,
false
);
final TaskActionToolbox taskActionToolbox = new TaskActionToolbox(
taskLockbox,

View File

@ -44,6 +44,7 @@ import org.apache.druid.indexing.common.RetryPolicyFactory;
import org.apache.druid.indexing.common.SegmentLoaderFactory;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.TestUtils;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.task.CompactionTask.Builder;
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTuningConfig;
import org.apache.druid.indexing.firehose.IngestSegmentFirehoseFactory;
@ -1188,7 +1189,7 @@ public class CompactionTaskRunTest extends IngestionTestBase
);
return new TaskToolbox(
null,
new TaskConfig(null, null, null, null, null, false, null, null, null, false),
null,
createActionClient(task),
null,

View File

@ -1479,7 +1479,11 @@ public class CompactionTaskTest
final DataSchema dataSchema = ingestionSchema.getDataSchema();
Assert.assertEquals(DATA_SOURCE, dataSchema.getDataSource());
Assert.assertEquals(new TimestampSpec(null, null, null), dataSchema.getTimestampSpec());
Assert.assertEquals(
new TimestampSpec(ColumnHolder.TIME_COLUMN_NAME, "millis", null),
dataSchema.getTimestampSpec()
);
Assert.assertEquals(
new HashSet<>(expectedDimensionsSpec.getDimensions()),
new HashSet<>(dataSchema.getDimensionsSpec().getDimensions())
@ -1511,11 +1515,6 @@ public class CompactionTaskTest
Assert.assertEquals(expectedSegmentIntervals.get(i), druidInputSource.getInterval());
Assert.assertNull(druidInputSource.getDimFilter());
Assert.assertEquals(
new HashSet<>(expectedDimensionsSpec.getDimensionNames()),
new HashSet<>(druidInputSource.getDimensions())
);
// assert tuningConfig
Assert.assertEquals(expectedTuningConfig, ingestionSchema.getTuningConfig());
}
@ -1552,7 +1551,7 @@ public class CompactionTaskTest
)
{
super(
null,
new TaskConfig(null, null, null, null, null, false, null, null, null, false),
null,
taskActionClient,
null,

View File

@ -116,7 +116,8 @@ public class HadoopTaskTest
false,
null,
null,
null
null,
false
)).once();
EasyMock.replay(toolbox);

View File

@ -35,6 +35,7 @@ import org.apache.druid.indexing.common.actions.TaskAction;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.actions.TaskActionClientFactory;
import org.apache.druid.indexing.common.actions.TaskActionToolbox;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.config.TaskStorageConfig;
import org.apache.druid.indexing.overlord.HeapMemoryTaskStorage;
import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
@ -311,7 +312,7 @@ public abstract class IngestionTestBase extends InitializedNullHandlingTest
);
final TaskToolbox box = new TaskToolbox(
null,
new TaskConfig(null, null, null, null, null, false, null, null, null, false),
new DruidNode("druid/middlemanager", "localhost", false, 8091, null, true, false),
taskActionClient,
null,

View File

@ -888,7 +888,18 @@ public class RealtimeIndexTaskTest extends InitializedNullHandlingTest
final File directory
)
{
final TaskConfig taskConfig = new TaskConfig(directory.getPath(), null, null, 50000, null, true, null, null, null);
final TaskConfig taskConfig = new TaskConfig(
directory.getPath(),
null,
null,
50000,
null,
true,
null,
null,
null,
false
);
final TaskLockbox taskLockbox = new TaskLockbox(taskStorage, mdc);
try {
taskStorage.insert(task, TaskStatus.running(task.getId()));

View File

@ -202,7 +202,8 @@ public class AbstractParallelIndexSupervisorTaskTest extends IngestionTestBase
false,
null,
null,
ImmutableList.of(new StorageLocationConfig(temporaryFolder.newFolder(), null, null))
ImmutableList.of(new StorageLocationConfig(temporaryFolder.newFolder(), null, null)),
false
),
null
);
@ -519,6 +520,8 @@ public class AbstractParallelIndexSupervisorTaskTest extends IngestionTestBase
public void prepareObjectMapper(ObjectMapper objectMapper, IndexIO indexIO)
{
final TaskConfig taskConfig = new TaskConfig(null, null, null, null, null, false, null, null, null, false);
objectMapper.setInjectableValues(
new InjectableValues.Std()
.addValue(ExprMacroTable.class, LookupEnabledTestExprMacroTable.INSTANCE)
@ -535,6 +538,7 @@ public class AbstractParallelIndexSupervisorTaskTest extends IngestionTestBase
.addValue(CoordinatorClient.class, coordinatorClient)
.addValue(SegmentLoaderFactory.class, new SegmentLoaderFactory(indexIO, objectMapper))
.addValue(RetryPolicyFactory.class, new RetryPolicyFactory(new RetryPolicyConfig()))
.addValue(TaskConfig.class, taskConfig)
);
objectMapper.registerSubtypes(
new NamedType(ParallelIndexSupervisorTask.class, ParallelIndexSupervisorTask.TYPE),
@ -550,7 +554,7 @@ public class AbstractParallelIndexSupervisorTaskTest extends IngestionTestBase
protected TaskToolbox createTaskToolbox(Task task, TaskActionClient actionClient) throws IOException
{
return new TaskToolbox(
null,
new TaskConfig(null, null, null, null, null, false, null, null, null, false),
new DruidNode("druid/middlemanager", "localhost", false, 8091, null, true, false),
actionClient,
null,

View File

@ -0,0 +1,224 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.input;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.InjectableValues;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import org.apache.druid.client.coordinator.CoordinatorClient;
import org.apache.druid.data.input.InputSource;
import org.apache.druid.guice.IndexingServiceInputSourceModule;
import org.apache.druid.indexing.common.RetryPolicyFactory;
import org.apache.druid.indexing.common.SegmentLoaderFactory;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.firehose.WindowedSegmentId;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.TestHelper;
import org.easymock.EasyMock;
import org.hamcrest.CoreMatchers;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
public class DruidInputSourceTest
{
private final IndexIO indexIO = EasyMock.createMock(IndexIO.class);
private final CoordinatorClient coordinatorClient = EasyMock.createMock(CoordinatorClient.class);
private final SegmentLoaderFactory segmentLoaderFactory = EasyMock.createMock(SegmentLoaderFactory.class);
private final RetryPolicyFactory retryPolicyFactory = EasyMock.createMock(RetryPolicyFactory.class);
private final TaskConfig taskConfig = EasyMock.createMock(TaskConfig.class);
private ObjectMapper mapper = null;
@Rule
public ExpectedException expectedException = ExpectedException.none();
@Before
public void setUp()
{
mapper = TestHelper.makeJsonMapper();
mapper.registerModules(new IndexingServiceInputSourceModule().getJacksonModules());
final InjectableValues.Std injectableValues = (InjectableValues.Std) mapper.getInjectableValues();
injectableValues.addValue(IndexIO.class, indexIO);
injectableValues.addValue(CoordinatorClient.class, coordinatorClient);
injectableValues.addValue(SegmentLoaderFactory.class, segmentLoaderFactory);
injectableValues.addValue(RetryPolicyFactory.class, retryPolicyFactory);
injectableValues.addValue(TaskConfig.class, taskConfig);
}
@Test
public void testSerdeUsingIntervals() throws Exception
{
final String json = "{"
+ "\"type\":\"druid\","
+ "\"dataSource\":\"foo\","
+ "\"interval\":\"2000-01-01T00:00:00.000Z/2001-01-01T00:00:00.000Z\""
+ "}";
final InputSource inputSource = mapper.readValue(json, InputSource.class);
Assert.assertThat(inputSource, CoreMatchers.instanceOf(DruidInputSource.class));
Assert.assertEquals(
new DruidInputSource(
"foo",
Intervals.of("2000/2001"),
null,
null,
null,
null,
indexIO,
coordinatorClient,
segmentLoaderFactory,
retryPolicyFactory,
taskConfig
),
inputSource
);
Assert.assertEquals(json, mapper.writeValueAsString(inputSource));
}
@Test
public void testSerdeUsingIntervalsAndLegacyDimensionsMetrics() throws Exception
{
final String json = "{"
+ "\"type\":\"druid\","
+ "\"dataSource\":\"foo\","
+ "\"interval\":\"2000-01-01T00:00:00.000Z/2001-01-01T00:00:00.000Z\","
+ "\"dimensions\":[\"a\"],"
+ "\"metrics\":[\"b\"]"
+ "}";
final InputSource inputSource = mapper.readValue(json, InputSource.class);
Assert.assertThat(inputSource, CoreMatchers.instanceOf(DruidInputSource.class));
Assert.assertEquals(
new DruidInputSource(
"foo",
Intervals.of("2000/2001"),
null,
null,
ImmutableList.of("a"),
ImmutableList.of("b"),
indexIO,
coordinatorClient,
segmentLoaderFactory,
retryPolicyFactory,
taskConfig
),
inputSource
);
Assert.assertEquals(json, mapper.writeValueAsString(inputSource));
}
@Test
public void testSerdeUsingSegments() throws Exception
{
final String json = "{"
+ "\"type\":\"druid\","
+ "\"dataSource\":\"foo\","
+ "\"segments\":["
+ "{\"segmentId\":\"foo_2000-01-01T00:00:00.000Z_2000-01-01T01:00:00.000Z_abc123\","
+ "\"intervals\":[\"2000-01-01T00:00:00.000Z/2000-01-01T12:00:00.000Z\"]}"
+ "]"
+ "}";
final InputSource inputSource = mapper.readValue(json, InputSource.class);
Assert.assertThat(inputSource, CoreMatchers.instanceOf(DruidInputSource.class));
Assert.assertEquals(
new DruidInputSource(
"foo",
null,
ImmutableList.of(
new WindowedSegmentId(
"foo_2000-01-01T00:00:00.000Z_2000-01-01T01:00:00.000Z_abc123",
ImmutableList.of(Intervals.of("2000-01-01T00/2000-01-01T12"))
)
),
null,
null,
null,
indexIO,
coordinatorClient,
segmentLoaderFactory,
retryPolicyFactory,
taskConfig
),
inputSource
);
Assert.assertEquals(json, mapper.writeValueAsString(inputSource));
}
@Test
public void testSerdeUsingBothIntervalsAndSegments() throws Exception
{
final String json = "{"
+ "\"type\":\"druid\","
+ "\"dataSource\":\"foo\","
+ "\"interval\":\"2000-01-01T00:00:00.000Z/2001-01-01T00:00:00.000Z\","
+ "\"segments\":["
+ " {\"segmentId\":\"foo_2000-01-01T00:00:00.000Z_2000-01-01T01:00:00.000Z_abc123\","
+ " \"intervals\":[\"2000-01-01T00:00:00.000Z/2000-01-01T12:00:00.000Z\"]}"
+ "]"
+ "}";
expectedException.expect(JsonProcessingException.class);
expectedException.expectMessage("Specify exactly one of 'interval' and 'segments'");
mapper.readValue(json, InputSource.class);
}
@Test
public void testSerdeUsingNeitherIntervalsNorSegments() throws Exception
{
final String json = "{"
+ "\"type\":\"druid\","
+ "\"dataSource\":\"foo\""
+ "}";
expectedException.expect(JsonProcessingException.class);
expectedException.expectMessage("Specify exactly one of 'interval' and 'segments'");
mapper.readValue(json, InputSource.class);
}
@Test
public void testSerdeUsingNoDataSource() throws Exception
{
final String json = "{"
+ "\"type\":\"druid\","
+ "\"interval\":\"2000-01-01T00:00:00.000Z/2001-01-01T00:00:00.000Z\""
+ "}";
expectedException.expect(JsonProcessingException.class);
expectedException.expectMessage("dataSource");
mapper.readValue(json, InputSource.class);
}
}

View File

@ -19,23 +19,528 @@
package org.apache.druid.indexing.input;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.apache.commons.lang.mutable.MutableBoolean;
import org.apache.druid.common.config.NullHandlingTest;
import org.apache.druid.data.input.ColumnsFilter;
import org.apache.druid.data.input.InputEntity.CleanableFile;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.MapBasedInputRow;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.DoubleDimensionSchema;
import org.apache.druid.data.input.impl.StringDimensionSchema;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.hll.HyperLogLogCollector;
import org.apache.druid.hll.HyperLogLogHash;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.guava.BaseSequence;
import org.apache.druid.java.util.common.guava.BaseSequence.IteratorMaker;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
import org.apache.druid.query.filter.SelectorDimFilter;
import org.apache.druid.segment.IndexBuilder;
import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.Segment;
import org.apache.druid.segment.SegmentLazyLoadFailCallback;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.incremental.IncrementalIndex;
import org.apache.druid.segment.incremental.IncrementalIndexSchema;
import org.apache.druid.segment.loading.SegmentLoader;
import org.apache.druid.segment.writeout.OnHeapMemorySegmentWriteOutMediumFactory;
import org.apache.druid.timeline.DataSegment;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
public class DruidSegmentReaderTest
public class DruidSegmentReaderTest extends NullHandlingTest
{
@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();
private File segmentDirectory;
private final IndexIO indexIO = TestHelper.getTestIndexIO();
@Before
public void setUp() throws IOException
{
// Write a segment with two rows in it, with columns: s (string), d (double), cnt (long), met_s (complex).
final IncrementalIndex<?> incrementalIndex =
IndexBuilder.create()
.schema(
new IncrementalIndexSchema.Builder()
.withDimensionsSpec(
new DimensionsSpec(
ImmutableList.of(
StringDimensionSchema.create("s"),
new DoubleDimensionSchema("d")
)
)
)
.withMetrics(
new CountAggregatorFactory("cnt"),
new HyperUniquesAggregatorFactory("met_s", "s")
)
.withRollup(false)
.build()
)
.rows(
ImmutableList.of(
new MapBasedInputRow(
DateTimes.of("2000"),
ImmutableList.of("s", "d"),
ImmutableMap.<String, Object>builder()
.put("s", "foo")
.put("d", 1.23)
.build()
),
new MapBasedInputRow(
DateTimes.of("2000T01"),
ImmutableList.of("s", "d"),
ImmutableMap.<String, Object>builder()
.put("s", "bar")
.put("d", 4.56)
.build()
)
)
)
.buildIncrementalIndex();
segmentDirectory = temporaryFolder.newFolder();
try {
TestHelper.getTestIndexMergerV9(
OnHeapMemorySegmentWriteOutMediumFactory.instance()
).persist(
incrementalIndex,
segmentDirectory,
new IndexSpec(),
null
);
}
finally {
incrementalIndex.close();
}
}
@Test
public void testReader() throws IOException
{
final DruidSegmentReader reader = new DruidSegmentReader(
makeInputEntity(Intervals.of("2000/P1D")),
indexIO,
new TimestampSpec("__time", "millis", DateTimes.of("1971")),
new DimensionsSpec(
ImmutableList.of(
StringDimensionSchema.create("s"),
new DoubleDimensionSchema("d")
)
),
ColumnsFilter.all(),
null,
temporaryFolder.newFolder()
);
Assert.assertEquals(
ImmutableList.of(
new MapBasedInputRow(
DateTimes.of("2000"),
ImmutableList.of("s", "d"),
ImmutableMap.<String, Object>builder()
.put("__time", DateTimes.of("2000T").getMillis())
.put("s", "foo")
.put("d", 1.23d)
.put("cnt", 1L)
.put("met_s", makeHLLC("foo"))
.build()
),
new MapBasedInputRow(
DateTimes.of("2000T01"),
ImmutableList.of("s", "d"),
ImmutableMap.<String, Object>builder()
.put("__time", DateTimes.of("2000T01").getMillis())
.put("s", "bar")
.put("d", 4.56d)
.put("cnt", 1L)
.put("met_s", makeHLLC("bar"))
.build()
)
),
readRows(reader)
);
}
@Test
public void testReaderAutoTimestampFormat() throws IOException
{
final DruidSegmentReader reader = new DruidSegmentReader(
makeInputEntity(Intervals.of("2000/P1D")),
indexIO,
new TimestampSpec("__time", "auto", DateTimes.of("1971")),
new DimensionsSpec(
ImmutableList.of(
StringDimensionSchema.create("s"),
new DoubleDimensionSchema("d")
)
),
ColumnsFilter.all(),
null,
temporaryFolder.newFolder()
);
Assert.assertEquals(
ImmutableList.of(
new MapBasedInputRow(
DateTimes.of("2000"),
ImmutableList.of("s", "d"),
ImmutableMap.<String, Object>builder()
.put("__time", DateTimes.of("2000T").getMillis())
.put("s", "foo")
.put("d", 1.23d)
.put("cnt", 1L)
.put("met_s", makeHLLC("foo"))
.build()
),
new MapBasedInputRow(
DateTimes.of("2000T01"),
ImmutableList.of("s", "d"),
ImmutableMap.<String, Object>builder()
.put("__time", DateTimes.of("2000T01").getMillis())
.put("s", "bar")
.put("d", 4.56d)
.put("cnt", 1L)
.put("met_s", makeHLLC("bar"))
.build()
)
),
readRows(reader)
);
}
@Test
public void testReaderWithDimensionExclusions() throws IOException
{
final DruidSegmentReader reader = new DruidSegmentReader(
makeInputEntity(Intervals.of("2000/P1D")),
indexIO,
new TimestampSpec("__time", "millis", DateTimes.of("1971")),
new DimensionsSpec(
ImmutableList.of(),
ImmutableList.of("__time", "s", "cnt", "met_s"),
ImmutableList.of()
),
ColumnsFilter.all(),
null,
temporaryFolder.newFolder()
);
Assert.assertEquals(
ImmutableList.of(
new MapBasedInputRow(
DateTimes.of("2000"),
ImmutableList.of("d"),
ImmutableMap.<String, Object>builder()
.put("__time", DateTimes.of("2000T").getMillis())
.put("s", "foo")
.put("d", 1.23d)
.put("cnt", 1L)
.put("met_s", makeHLLC("foo"))
.build()
),
new MapBasedInputRow(
DateTimes.of("2000T01"),
ImmutableList.of("d"),
ImmutableMap.<String, Object>builder()
.put("__time", DateTimes.of("2000T01").getMillis())
.put("s", "bar")
.put("d", 4.56d)
.put("cnt", 1L)
.put("met_s", makeHLLC("bar"))
.build()
)
),
readRows(reader)
);
}
@Test
public void testReaderWithInclusiveColumnsFilter() throws IOException
{
final DruidSegmentReader reader = new DruidSegmentReader(
makeInputEntity(Intervals.of("2000/P1D")),
indexIO,
new TimestampSpec("__time", "millis", DateTimes.of("1971")),
new DimensionsSpec(
ImmutableList.of(
StringDimensionSchema.create("s"),
new DoubleDimensionSchema("d")
)
),
ColumnsFilter.inclusionBased(ImmutableSet.of("__time", "s", "d")),
null,
temporaryFolder.newFolder()
);
Assert.assertEquals(
ImmutableList.of(
new MapBasedInputRow(
DateTimes.of("2000"),
ImmutableList.of("s", "d"),
ImmutableMap.<String, Object>builder()
.put("__time", DateTimes.of("2000T").getMillis())
.put("s", "foo")
.put("d", 1.23d)
.build()
),
new MapBasedInputRow(
DateTimes.of("2000T01"),
ImmutableList.of("s", "d"),
ImmutableMap.<String, Object>builder()
.put("__time", DateTimes.of("2000T01").getMillis())
.put("s", "bar")
.put("d", 4.56d)
.build()
)
),
readRows(reader)
);
}
@Test
public void testReaderWithInclusiveColumnsFilterNoTimestamp() throws IOException
{
final DruidSegmentReader reader = new DruidSegmentReader(
makeInputEntity(Intervals.of("2000/P1D")),
indexIO,
new TimestampSpec("__time", "millis", DateTimes.of("1971")),
new DimensionsSpec(
ImmutableList.of(
StringDimensionSchema.create("s"),
new DoubleDimensionSchema("d")
)
),
ColumnsFilter.inclusionBased(ImmutableSet.of("s", "d")),
null,
temporaryFolder.newFolder()
);
Assert.assertEquals(
ImmutableList.of(
new MapBasedInputRow(
DateTimes.of("1971"),
ImmutableList.of("s", "d"),
ImmutableMap.<String, Object>builder()
.put("s", "foo")
.put("d", 1.23d)
.build()
),
new MapBasedInputRow(
DateTimes.of("1971"),
ImmutableList.of("s", "d"),
ImmutableMap.<String, Object>builder()
.put("s", "bar")
.put("d", 4.56d)
.build()
)
),
readRows(reader)
);
}
@Test
public void testReaderWithFilter() throws IOException
{
final DruidSegmentReader reader = new DruidSegmentReader(
makeInputEntity(Intervals.of("2000/P1D")),
indexIO,
new TimestampSpec("__time", "millis", DateTimes.of("1971")),
new DimensionsSpec(
ImmutableList.of(
StringDimensionSchema.create("s"),
new DoubleDimensionSchema("d")
)
),
ColumnsFilter.all(),
new SelectorDimFilter("d", "1.23", null),
temporaryFolder.newFolder()
);
Assert.assertEquals(
ImmutableList.of(
new MapBasedInputRow(
DateTimes.of("2000"),
ImmutableList.of("s", "d"),
ImmutableMap.<String, Object>builder()
.put("__time", DateTimes.of("2000T").getMillis())
.put("s", "foo")
.put("d", 1.23d)
.put("cnt", 1L)
.put("met_s", makeHLLC("foo"))
.build()
)
),
readRows(reader)
);
}
@Test
public void testReaderTimestampFromDouble() throws IOException
{
final DruidSegmentReader reader = new DruidSegmentReader(
makeInputEntity(Intervals.of("2000/P1D")),
indexIO,
new TimestampSpec("d", "posix", null),
new DimensionsSpec(
ImmutableList.of(
StringDimensionSchema.create("s"),
new DoubleDimensionSchema("d")
)
),
ColumnsFilter.all(),
null,
temporaryFolder.newFolder()
);
Assert.assertEquals(
ImmutableList.of(
new MapBasedInputRow(
DateTimes.of("1970-01-01T00:00:01.000Z"),
ImmutableList.of("s", "d"),
ImmutableMap.<String, Object>builder()
.put("__time", DateTimes.of("2000T").getMillis())
.put("s", "foo")
.put("d", 1.23d)
.put("cnt", 1L)
.put("met_s", makeHLLC("foo"))
.build()
),
new MapBasedInputRow(
DateTimes.of("1970-01-01T00:00:04.000Z"),
ImmutableList.of("s", "d"),
ImmutableMap.<String, Object>builder()
.put("__time", DateTimes.of("2000T01").getMillis())
.put("s", "bar")
.put("d", 4.56d)
.put("cnt", 1L)
.put("met_s", makeHLLC("bar"))
.build()
)
),
readRows(reader)
);
}
@Test
public void testReaderTimestampAsPosixIncorrectly() throws IOException
{
final DruidSegmentReader reader = new DruidSegmentReader(
makeInputEntity(Intervals.of("2000/P1D")),
indexIO,
new TimestampSpec("__time", "posix", null),
new DimensionsSpec(
ImmutableList.of(
StringDimensionSchema.create("s"),
new DoubleDimensionSchema("d")
)
),
ColumnsFilter.all(),
null,
temporaryFolder.newFolder()
);
Assert.assertEquals(
ImmutableList.of(
new MapBasedInputRow(
DateTimes.of("31969-04-01T00:00:00.000Z"),
ImmutableList.of("s", "d"),
ImmutableMap.<String, Object>builder()
.put("__time", DateTimes.of("2000T").getMillis())
.put("s", "foo")
.put("d", 1.23d)
.put("cnt", 1L)
.put("met_s", makeHLLC("foo"))
.build()
),
new MapBasedInputRow(
DateTimes.of("31969-05-12T16:00:00.000Z"),
ImmutableList.of("s", "d"),
ImmutableMap.<String, Object>builder()
.put("__time", DateTimes.of("2000T01").getMillis())
.put("s", "bar")
.put("d", 4.56d)
.put("cnt", 1L)
.put("met_s", makeHLLC("bar"))
.build()
)
),
readRows(reader)
);
}
@Test
public void testReaderTimestampSpecDefault() throws IOException
{
final DruidSegmentReader reader = new DruidSegmentReader(
makeInputEntity(Intervals.of("2000/P1D")),
indexIO,
new TimestampSpec(null, null, DateTimes.of("1971")),
new DimensionsSpec(
ImmutableList.of(
StringDimensionSchema.create("s"),
new DoubleDimensionSchema("d")
)
),
ColumnsFilter.all(),
null,
temporaryFolder.newFolder()
);
Assert.assertEquals(
ImmutableList.of(
new MapBasedInputRow(
DateTimes.of("1971"),
ImmutableList.of("s", "d"),
ImmutableMap.<String, Object>builder()
.put("__time", DateTimes.of("2000T").getMillis())
.put("s", "foo")
.put("d", 1.23d)
.put("cnt", 1L)
.put("met_s", makeHLLC("foo"))
.build()
),
new MapBasedInputRow(
DateTimes.of("1971"),
ImmutableList.of("s", "d"),
ImmutableMap.<String, Object>builder()
.put("__time", DateTimes.of("2000T01").getMillis())
.put("s", "bar")
.put("d", 4.56d)
.put("cnt", 1L)
.put("met_s", makeHLLC("bar"))
.build()
)
),
readRows(reader)
);
}
@Test
public void testMakeCloseableIteratorFromSequenceAndSegmentFileCloseYielderOnClose() throws IOException
{
@ -80,4 +585,65 @@ public class DruidSegmentReaderTest
Assert.assertTrue("File is not closed", isFileClosed.booleanValue());
Assert.assertTrue("Sequence is not closed", isSequenceClosed.booleanValue());
}
private DruidSegmentInputEntity makeInputEntity(final Interval interval)
{
return new DruidSegmentInputEntity(
new SegmentLoader()
{
@Override
public boolean isSegmentLoaded(DataSegment segment)
{
throw new UnsupportedOperationException("unused");
}
@Override
public Segment getSegment(DataSegment segment, boolean lazy, SegmentLazyLoadFailCallback loadFailed)
{
throw new UnsupportedOperationException("unused");
}
@Override
public File getSegmentFiles(DataSegment segment)
{
return segmentDirectory;
}
@Override
public void cleanup(DataSegment segment)
{
throw new UnsupportedOperationException("unused");
}
},
DataSegment.builder()
.dataSource("ds")
.dimensions(ImmutableList.of("s", "d"))
.metrics(ImmutableList.of("cnt", "met_s"))
.interval(Intervals.of("2000/P1D"))
.version("1")
.size(0)
.build(),
interval
);
}
private List<InputRow> readRows(final DruidSegmentReader reader) throws IOException
{
final List<InputRow> rows = new ArrayList<>();
try (final CloseableIterator<Map<String, Object>> iterator = reader.intermediateRowIterator()) {
while (iterator.hasNext()) {
rows.addAll(reader.parseInputRows(iterator.next()));
}
}
return rows;
}
private static HyperLogLogCollector makeHLLC(final String... values)
{
final HyperLogLogCollector collector = HyperLogLogCollector.makeLatestCollector();
for (String value : values) {
collector.add(HyperLogLogHash.getDefault().hash(value));
}
return collector;
}
}

View File

@ -0,0 +1,105 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.input;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.common.config.NullHandlingTest;
import org.apache.druid.data.input.ColumnsFilter;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.StringDimensionSchema;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.query.filter.SelectorDimFilter;
import org.apache.druid.segment.transform.ExpressionTransform;
import org.apache.druid.segment.transform.TransformSpec;
import org.junit.Assert;
import org.junit.Test;
public class InputRowSchemasTest extends NullHandlingTest
{
@Test
public void test_createColumnsFilter_normal()
{
final ColumnsFilter columnsFilter = InputRowSchemas.createColumnsFilter(
new TimestampSpec("ts", "auto", null),
new DimensionsSpec(
ImmutableList.of(StringDimensionSchema.create("foo")),
ImmutableList.of(),
ImmutableList.of()
),
new TransformSpec(
new SelectorDimFilter("bar", "x", null),
ImmutableList.of(
new ExpressionTransform("baz", "qux + 3", ExprMacroTable.nil())
)
),
new AggregatorFactory[]{
new LongSumAggregatorFactory("billy", "bob")
}
);
Assert.assertEquals(
ColumnsFilter.inclusionBased(
ImmutableSet.of(
"ts",
"foo",
"bar",
"qux",
"bob"
)
),
columnsFilter
);
}
@Test
public void test_createColumnsFilter_schemaless()
{
final ColumnsFilter columnsFilter = InputRowSchemas.createColumnsFilter(
new TimestampSpec("ts", "auto", null),
new DimensionsSpec(
ImmutableList.of(),
ImmutableList.of("ts", "foo", "bar", "qux", "bob"),
ImmutableList.of()
),
new TransformSpec(
new SelectorDimFilter("bar", "x", null),
ImmutableList.of(
new ExpressionTransform("baz", "qux + 3", ExprMacroTable.nil())
)
),
new AggregatorFactory[]{
new LongSumAggregatorFactory("billy", "bob")
}
);
Assert.assertEquals(
ColumnsFilter.exclusionBased(
ImmutableSet.of(
"foo"
)
),
columnsFilter
);
}
}

View File

@ -89,7 +89,8 @@ public class SingleTaskBackgroundRunnerTest
true,
null,
null,
null
null,
false
);
final ServiceEmitter emitter = new NoopServiceEmitter();
final TaskToolboxFactory toolboxFactory = new TaskToolboxFactory(

View File

@ -599,7 +599,7 @@ public class TaskLifecycleTest extends InitializedNullHandlingTest
new TaskAuditLogConfig(true)
);
File tmpDir = temporaryFolder.newFolder();
taskConfig = new TaskConfig(tmpDir.toString(), null, null, 50000, null, false, null, null, null);
taskConfig = new TaskConfig(tmpDir.toString(), null, null, 50000, null, false, null, null, null, false);
return new TaskToolboxFactory(
taskConfig,

View File

@ -21,6 +21,7 @@ package org.apache.druid.indexing.seekablestream;
import com.google.common.collect.Maps;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.druid.data.input.ColumnsFilter;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.InputRowSchema;
@ -79,7 +80,7 @@ public class RecordSupplierInputSourceTest extends InitializedNullHandlingTest
new InputRowSchema(
new TimestampSpec("col_0", "auto", null),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(colNames.subList(1, colNames.size()))),
Collections.emptyList()
ColumnsFilter.all()
),
inputFormat,
temporaryFolder.newFolder()

View File

@ -21,6 +21,7 @@ package org.apache.druid.indexing.seekablestream;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import org.apache.druid.data.input.ColumnsFilter;
import org.apache.druid.data.input.InputEntity;
import org.apache.druid.data.input.InputEntityReader;
import org.apache.druid.data.input.InputRow;
@ -109,7 +110,7 @@ public class StreamChunkParserTest
final StreamChunkParser<ByteEntity> chunkParser = new StreamChunkParser<>(
null,
inputFormat,
new InputRowSchema(TIMESTAMP_SPEC, DimensionsSpec.EMPTY, Collections.emptyList()),
new InputRowSchema(TIMESTAMP_SPEC, DimensionsSpec.EMPTY, ColumnsFilter.all()),
TransformSpec.NONE,
temporaryFolder.newFolder(),
row -> true,
@ -157,7 +158,7 @@ public class StreamChunkParserTest
final StreamChunkParser<ByteEntity> chunkParser = new StreamChunkParser<>(
parser,
inputFormat,
new InputRowSchema(TIMESTAMP_SPEC, DimensionsSpec.EMPTY, Collections.emptyList()),
new InputRowSchema(TIMESTAMP_SPEC, DimensionsSpec.EMPTY, ColumnsFilter.all()),
TransformSpec.NONE,
temporaryFolder.newFolder(),
row -> true,
@ -179,7 +180,7 @@ public class StreamChunkParserTest
final StreamChunkParser<ByteEntity> chunkParser = new StreamChunkParser<>(
null,
inputFormat,
new InputRowSchema(TIMESTAMP_SPEC, DimensionsSpec.EMPTY, Collections.emptyList()),
new InputRowSchema(TIMESTAMP_SPEC, DimensionsSpec.EMPTY, ColumnsFilter.all()),
TransformSpec.NONE,
temporaryFolder.newFolder(),
row -> true,
@ -202,7 +203,7 @@ public class StreamChunkParserTest
final StreamChunkParser<ByteEntity> chunkParser = new StreamChunkParser<>(
null,
inputFormat,
new InputRowSchema(TIMESTAMP_SPEC, DimensionsSpec.EMPTY, Collections.emptyList()),
new InputRowSchema(TIMESTAMP_SPEC, DimensionsSpec.EMPTY, ColumnsFilter.all()),
TransformSpec.NONE,
temporaryFolder.newFolder(),
row -> true,

View File

@ -88,7 +88,8 @@ public class WorkerTaskManagerTest
false,
null,
null,
null
null,
false
);
TaskActionClientFactory taskActionClientFactory = EasyMock.createNiceMock(TaskActionClientFactory.class);
TaskActionClient taskActionClient = EasyMock.createNiceMock(TaskActionClient.class);

View File

@ -162,7 +162,8 @@ public class WorkerTaskMonitorTest
false,
null,
null,
null
null,
false
);
TaskActionClientFactory taskActionClientFactory = EasyMock.createNiceMock(TaskActionClientFactory.class);
TaskActionClient taskActionClient = EasyMock.createNiceMock(TaskActionClient.class);

View File

@ -87,7 +87,8 @@ public class IntermediaryDataManagerAutoCleanupTest
false,
null,
null,
ImmutableList.of(new StorageLocationConfig(tempDir.newFolder(), null, null))
ImmutableList.of(new StorageLocationConfig(tempDir.newFolder(), null, null)),
false
);
final IndexingServiceClient indexingServiceClient = new NoopIndexingServiceClient()
{

View File

@ -70,7 +70,8 @@ public class IntermediaryDataManagerManualAddAndDeleteTest
false,
null,
null,
ImmutableList.of(new StorageLocationConfig(intermediarySegmentsLocation, 600L, null))
ImmutableList.of(new StorageLocationConfig(intermediarySegmentsLocation, 600L, null)),
false
);
final IndexingServiceClient indexingServiceClient = new NoopIndexingServiceClient();
intermediaryDataManager = new IntermediaryDataManager(workerConfig, taskConfig, indexingServiceClient);

View File

@ -69,7 +69,8 @@ public class ShuffleDataSegmentPusherTest
false,
null,
null,
ImmutableList.of(new StorageLocationConfig(temporaryFolder.newFolder(), null, null))
ImmutableList.of(new StorageLocationConfig(temporaryFolder.newFolder(), null, null)),
false
);
final IndexingServiceClient indexingServiceClient = new NoopIndexingServiceClient();
intermediaryDataManager = new IntermediaryDataManager(workerConfig, taskConfig, indexingServiceClient);

View File

@ -95,7 +95,8 @@ public class ShuffleResourceTest
false,
null,
null,
ImmutableList.of(new StorageLocationConfig(tempDir.newFolder(), null, null))
ImmutableList.of(new StorageLocationConfig(tempDir.newFolder(), null, null)),
false
);
final IndexingServiceClient indexingServiceClient = new NoopIndexingServiceClient()
{

View File

@ -68,3 +68,7 @@ druid_indexer_logs_directory=/shared/tasklogs
druid_sql_enable=true
druid_extensions_hadoopDependenciesDir=/shared/hadoop-dependencies
druid_request_logging_type=slf4j
# Testing the legacy config from https://github.com/apache/druid/pull/10267
# Can remove this when the flag is no longer needed
druid_indexer_task_ignoreTimestampSpecForDruidInputSource=true

View File

@ -129,7 +129,7 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
fullDatasourceName,
AutoCompactionSnapshot.AutoCompactionScheduleStatus.RUNNING,
0,
22482,
22481,
0,
0,
3,
@ -275,7 +275,7 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
fullDatasourceName,
AutoCompactionSnapshot.AutoCompactionScheduleStatus.RUNNING,
0,
22482,
22481,
0,
0,
3,

View File

@ -10,7 +10,8 @@
]
},
"timestampSpec": {
"column": "timestamp"
"column": "ignored-see-ignoreTimestampSpecForDruidInputSource",
"format": "iso"
},
"metricsSpec": [
{

View File

@ -24,7 +24,7 @@
},
"timestampSpec": {
"column": "__time",
"format": "iso"
"format": "millis"
},
"dimensionsSpec": {
"dimensionExclusions" : ["robot", "continent"]

View File

@ -24,7 +24,7 @@
},
"timestampSpec": {
"column": "__time",
"format": "iso"
"format": "millis"
},
"dimensionsSpec": {
"dimensions": [

View File

@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.base.Suppliers;
import org.apache.druid.data.input.Row;
import org.apache.druid.math.expr.Expr;
import org.apache.druid.math.expr.ExprMacroTable;
@ -32,12 +33,15 @@ import org.apache.druid.segment.virtual.ExpressionSelectors;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.function.Supplier;
public class ExpressionTransform implements Transform
{
private final String name;
private final String expression;
private final ExprMacroTable macroTable;
private final Supplier<Expr> parsedExpression;
@JsonCreator
public ExpressionTransform(
@ -49,6 +53,9 @@ public class ExpressionTransform implements Transform
this.name = Preconditions.checkNotNull(name, "name");
this.expression = Preconditions.checkNotNull(expression, "expression");
this.macroTable = macroTable;
this.parsedExpression = Suppliers.memoize(
() -> Parser.parse(expression, Preconditions.checkNotNull(this.macroTable, "macroTable"))
)::get;
}
@JsonProperty
@ -67,8 +74,13 @@ public class ExpressionTransform implements Transform
@Override
public RowFunction getRowFunction()
{
final Expr expr = Parser.parse(expression, Preconditions.checkNotNull(this.macroTable, "macroTable"));
return new ExpressionRowFunction(expr);
return new ExpressionRowFunction(parsedExpression.get());
}
@Override
public Set<String> getRequiredColumns()
{
return parsedExpression.get().analyzeInputs().getRequiredBindings();
}
static class ExpressionRowFunction implements RowFunction

View File

@ -23,6 +23,8 @@ import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import org.apache.druid.guice.annotations.ExtensionPoint;
import java.util.Set;
/**
* A row transform that is part of a {@link TransformSpec}. Transforms allow adding new fields to input rows. Each
* one has a "name" (the name of the new field) which can be referred to by DimensionSpecs, AggregatorFactories, etc.
@ -52,4 +54,9 @@ public interface Transform
* as output.
*/
RowFunction getRowFunction();
/**
* Returns the names of all columns that this transform is going to read.
*/
Set<String> getRequiredColumns();
}

View File

@ -126,6 +126,21 @@ public class TransformSpec
return new Transformer(this);
}
public Set<String> getRequiredColumns()
{
final Set<String> requiredColumns = new HashSet<>();
if (filter != null) {
requiredColumns.addAll(filter.getRequiredColumns());
}
for (Transform transform : transforms) {
requiredColumns.addAll(transform.getRequiredColumns());
}
return requiredColumns;
}
@Override
public boolean equals(final Object o)
{

View File

@ -17,11 +17,12 @@
* under the License.
*/
package org.apache.druid.segment.indexing;
package org.apache.druid.segment.transform;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.InputRowParser;
@ -33,8 +34,6 @@ import org.apache.druid.query.expression.TestExprMacroTable;
import org.apache.druid.query.filter.AndDimFilter;
import org.apache.druid.query.filter.SelectorDimFilter;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.transform.ExpressionTransform;
import org.apache.druid.segment.transform.TransformSpec;
import org.apache.druid.testing.InitializedNullHandlingTest;
import org.junit.Assert;
import org.junit.Test;
@ -80,6 +79,11 @@ public class TransformSpecTest extends InitializedNullHandlingTest
)
);
Assert.assertEquals(
ImmutableSet.of("x", "y", "a", "b", "f", "g"),
transformSpec.getRequiredColumns()
);
final InputRowParser<Map<String, Object>> parser = transformSpec.decorate(PARSER);
final InputRow row = parser.parseBatch(ROW1).get(0);
@ -108,6 +112,11 @@ public class TransformSpecTest extends InitializedNullHandlingTest
)
);
Assert.assertEquals(
ImmutableSet.of("x", "y"),
transformSpec.getRequiredColumns()
);
final InputRowParser<Map<String, Object>> parser = transformSpec.decorate(PARSER);
final InputRow row = parser.parseBatch(ROW1).get(0);
@ -139,6 +148,12 @@ public class TransformSpecTest extends InitializedNullHandlingTest
)
);
Assert.assertEquals(
ImmutableSet.of("x", "f", "g", "y", "a", "b"),
transformSpec.getRequiredColumns()
);
final InputRowParser<Map<String, Object>> parser = transformSpec.decorate(PARSER);
Assert.assertNotNull(parser.parseBatch(ROW1).get(0));
Assert.assertNull(parser.parseBatch(ROW2).get(0));
@ -154,6 +169,12 @@ public class TransformSpecTest extends InitializedNullHandlingTest
)
);
Assert.assertEquals(
ImmutableSet.of("a", "b"),
transformSpec.getRequiredColumns()
);
final InputRowParser<Map<String, Object>> parser = transformSpec.decorate(PARSER);
final InputRow row = parser.parseBatch(ROW1).get(0);
@ -172,6 +193,11 @@ public class TransformSpecTest extends InitializedNullHandlingTest
)
);
Assert.assertEquals(
ImmutableSet.of("__time"),
transformSpec.getRequiredColumns()
);
final InputRowParser<Map<String, Object>> parser = transformSpec.decorate(PARSER);
final InputRow row = parser.parseBatch(ROW1).get(0);

View File

@ -27,8 +27,8 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Sets;
import org.apache.druid.common.utils.IdUtils;
import org.apache.druid.data.input.impl.DimensionSchema;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.InputRowParser;
import org.apache.druid.data.input.impl.ParseSpec;
@ -36,6 +36,7 @@ import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.segment.indexing.granularity.GranularitySpec;
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.druid.segment.transform.TransformSpec;
@ -45,8 +46,7 @@ import java.util.Arrays;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;
/**
@ -55,7 +55,6 @@ import java.util.stream.Collectors;
public class DataSchema
{
private static final Logger log = new Logger(DataSchema.class);
private static final Pattern INVALIDCHARS = Pattern.compile("(?s).*[^\\S ].*");
private final String dataSource;
private final AggregatorFactory[] aggregators;
private final GranularitySpec granularitySpec;
@ -150,35 +149,47 @@ public class DataSchema
IdUtils.validateId("dataSource", dataSource);
}
/**
* Computes the {@link DimensionsSpec} that we will actually use. It is derived from, but not necessarily identical
* to, the one that we were given.
*/
private static DimensionsSpec computeDimensionsSpec(
TimestampSpec timestampSpec,
DimensionsSpec dimensionsSpec,
AggregatorFactory[] aggregators
final TimestampSpec timestampSpec,
final DimensionsSpec dimensionsSpec,
final AggregatorFactory[] aggregators
)
{
final Set<String> dimensionExclusions = new HashSet<>();
final Set<String> inputFieldNames = new HashSet<>();
final Set<String> outputFieldNames = new HashSet<>();
final String timestampColumn = timestampSpec.getTimestampColumn();
if (!(dimensionsSpec.hasCustomDimensions() && dimensionsSpec.getDimensionNames().contains(timestampColumn))) {
dimensionExclusions.add(timestampColumn);
}
// Populate inputFieldNames.
inputFieldNames.add(timestampSpec.getTimestampColumn());
inputFieldNames.addAll(dimensionsSpec.getDimensionNames());
Arrays.stream(aggregators)
.flatMap(aggregator -> aggregator.requiredFields().stream())
.forEach(inputFieldNames::add);
for (AggregatorFactory aggregator : aggregators) {
dimensionExclusions.addAll(aggregator.requiredFields());
dimensionExclusions.add(aggregator.getName());
}
// Populate outputFieldNames, validating along the way for lack of duplicates.
outputFieldNames.add(ColumnHolder.TIME_COLUMN_NAME);
final Set<String> metSet = Arrays.stream(aggregators).map(AggregatorFactory::getName).collect(Collectors.toSet());
final Set<String> dimSet = new HashSet<>(dimensionsSpec.getDimensionNames());
final Set<String> overlap = Sets.intersection(metSet, dimSet);
if (!overlap.isEmpty()) {
throw new IAE(
"Cannot have overlapping dimensions and metrics of the same name. Please change the name of the metric. Overlap: %s",
overlap
);
}
Stream.concat(
dimensionsSpec.getDimensions().stream().map(DimensionSchema::getName),
Arrays.stream(aggregators).map(AggregatorFactory::getName)
).forEach(
field -> {
if (!outputFieldNames.add(field)) {
throw new IAE("Cannot specify field [%s] more than once", field);
}
}
);
return dimensionsSpec.withDimensionExclusions(Sets.difference(dimensionExclusions, dimSet));
// Set up additional exclusions: all inputs and outputs, minus defined dimensions.
final Set<String> additionalDimensionExclusions = new HashSet<>();
additionalDimensionExclusions.addAll(inputFieldNames);
additionalDimensionExclusions.addAll(outputFieldNames);
additionalDimensionExclusions.removeAll(dimensionsSpec.getDimensionNames());
return dimensionsSpec.withDimensionExclusions(additionalDimensionExclusions);
}
@JsonProperty

View File

@ -27,6 +27,7 @@ import com.google.common.collect.ImmutableList;
import nl.jqno.equalsverifier.EqualsVerifier;
import org.apache.commons.dbcp2.BasicDataSource;
import org.apache.commons.io.FileUtils;
import org.apache.druid.data.input.ColumnsFilter;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.InputRowListPlusRawValues;
@ -54,7 +55,6 @@ import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Objects;
@ -80,7 +80,7 @@ public class SqlInputSourceTest
new ArrayList<>(),
new ArrayList<>()
),
Collections.emptyList()
ColumnsFilter.all()
);
@Rule
public final TestDerbyConnector.DerbyConnectorRule derbyConnectorRule = new TestDerbyConnector.DerbyConnectorRule();

View File

@ -101,7 +101,7 @@ public class DataSchemaTest extends InitializedNullHandlingTest
);
Assert.assertEquals(
ImmutableSet.of("time", "col1", "col2", "metric1", "metric2"),
ImmutableSet.of("__time", "time", "col1", "col2", "metric1", "metric2"),
schema.getDimensionsSpec().getDimensionExclusions()
);
}
@ -139,7 +139,7 @@ public class DataSchemaTest extends InitializedNullHandlingTest
);
Assert.assertEquals(
ImmutableSet.of("dimC", "col1", "metric1", "metric2"),
ImmutableSet.of("__time", "dimC", "col1", "metric1", "metric2"),
schema.getParser().getParseSpec().getDimensionsSpec().getDimensionExclusions()
);
}
@ -409,7 +409,7 @@ public class DataSchemaTest extends InitializedNullHandlingTest
actual.getParser().getParseSpec(),
new JSONParseSpec(
new TimestampSpec("xXx", null, null),
new DimensionsSpec(null, Arrays.asList("metric1", "xXx", "col1"), null),
new DimensionsSpec(null, Arrays.asList("__time", "metric1", "xXx", "col1"), null),
null,
null,
null

View File

@ -115,50 +115,50 @@ function validateConnectLocalData(preview: string) {
expect(firstLine).toBe(
'Druid row: {' +
'"__time":1442018818771' +
',"isRobot":"false"' +
',"countryIsoCode":null' +
',"added":"36"' +
',"regionName":null' +
',"channel":"#en.wikipedia"' +
',"delta":"36"' +
',"isUnpatrolled":"false"' +
',"isNew":"false"' +
',"isMinor":"false"' +
',"isAnonymous":"false"' +
',"deleted":"0"' +
',"cityName":null' +
',"metroCode":null' +
',"namespace":"Talk"' +
',"comment":"added project"' +
',"countryName":null' +
',"isAnonymous":"false"' +
',"isMinor":"false"' +
',"isNew":"false"' +
',"isRobot":"false"' +
',"isUnpatrolled":"false"' +
',"namespace":"Talk"' +
',"page":"Talk:Oswald Tilghman"' +
',"user":"GELongstreet"' +
',"added":"36"' +
',"deleted":"0"' +
',"delta":"36"' +
',"cityName":null' +
',"countryIsoCode":null' +
',"countryName":null' +
',"regionIsoCode":null' +
',"regionName":null' +
',"metroCode":null' +
'}',
);
const lastLine = lines[lines.length - 1];
expect(lastLine).toBe(
'Druid row: {' +
'"__time":1442020314823' +
',"isRobot":"false"' +
',"countryIsoCode":null' +
',"added":"1"' +
',"regionName":null' +
',"channel":"#en.wikipedia"' +
',"delta":"1"' +
',"isUnpatrolled":"false"' +
',"isNew":"false"' +
',"isMinor":"true"' +
',"isAnonymous":"false"' +
',"deleted":"0"' +
',"cityName":null' +
',"metroCode":null' +
',"namespace":"Main"' +
',"comment":"/* History */[[WP:AWB/T|Typo fixing]], [[WP:AWB/T|typo(s) fixed]]: nothern → northern using [[Project:AWB|AWB]]"' +
',"countryName":null' +
',"isAnonymous":"false"' +
',"isMinor":"true"' +
',"isNew":"false"' +
',"isRobot":"false"' +
',"isUnpatrolled":"false"' +
',"namespace":"Main"' +
',"page":"Hapoel Katamon Jerusalem F.C."' +
',"user":"The Quixotic Potato"' +
',"added":"1"' +
',"deleted":"0"' +
',"delta":"1"' +
',"cityName":null' +
',"countryIsoCode":null' +
',"countryName":null' +
',"regionIsoCode":null' +
',"regionName":null' +
',"metroCode":null' +
'}',
);
}

View File

@ -485,32 +485,6 @@ export function getIoConfigFormFields(ingestionComboType: IngestionComboType): F
</p>
),
},
{
name: 'inputSource.dimensions',
label: 'Dimensions',
type: 'string-array',
placeholder: '(optional)',
hideInMore: true,
info: (
<p>
The list of dimensions to select. If left empty, no dimensions are returned. If left
null or not defined, all dimensions are returned.
</p>
),
},
{
name: 'inputSource.metrics',
label: 'Metrics',
type: 'string-array',
placeholder: '(optional)',
hideInMore: true,
info: (
<p>
The list of metrics to select. If left empty, no metrics are returned. If left null or
not defined, all metrics are selected.
</p>
),
},
{
name: 'inputSource.filter',
label: 'Filter',

View File

@ -32,11 +32,18 @@ import { Transform } from './transform-spec';
const NO_SUCH_COLUMN = '!!!_no_such_column_!!!';
export const TIME_COLUMN = '__time';
export const PLACEHOLDER_TIMESTAMP_SPEC: TimestampSpec = {
column: NO_SUCH_COLUMN,
missingValue: '1970-01-01T00:00:00Z',
};
export const REINDEX_TIMESTAMP_SPEC: TimestampSpec = {
column: TIME_COLUMN,
format: 'millis',
};
export const CONSTANT_TIMESTAMP_SPEC: TimestampSpec = {
column: NO_SUCH_COLUMN,
missingValue: '2010-01-01T00:00:00Z',
@ -48,7 +55,7 @@ export function getTimestampSchema(spec: IngestionSpec): TimestampSchema {
const transforms: Transform[] =
deepGet(spec, 'spec.dataSchema.transformSpec.transforms') || EMPTY_ARRAY;
const timeTransform = transforms.find(transform => transform.name === '__time');
const timeTransform = transforms.find(transform => transform.name === TIME_COLUMN);
if (timeTransform) return 'expression';
const timestampSpec = deepGet(spec, 'spec.dataSchema.timestampSpec') || EMPTY_OBJECT;
@ -74,7 +81,7 @@ export function getTimestampSpecExpressionFromSpec(spec: IngestionSpec): string
const transforms: Transform[] =
deepGet(spec, 'spec.dataSchema.transformSpec.transforms') || EMPTY_ARRAY;
const timeTransform = transforms.find(transform => transform.name === '__time');
const timeTransform = transforms.find(transform => transform.name === TIME_COLUMN);
if (!timeTransform) return;
return timeTransform.expression;
}

View File

@ -29,6 +29,8 @@ import {
isDruidSource,
MetricSpec,
PLACEHOLDER_TIMESTAMP_SPEC,
REINDEX_TIMESTAMP_SPEC,
TIME_COLUMN,
TimestampSpec,
Transform,
TransformSpec,
@ -152,13 +154,13 @@ export function headerFromSampleResponse(options: HeaderFromSampleResponseOption
let columns = sortWithPrefixSuffix(
dedupe(sampleResponse.data.flatMap(s => (s.parsed ? Object.keys(s.parsed) : []))).sort(),
columnOrder || ['__time'],
columnOrder || [TIME_COLUMN],
suffixColumnOrder || [],
alphanumericCompare,
);
if (ignoreTimeColumn) {
columns = columns.filter(c => c !== '__time');
columns = columns.filter(c => c !== TIME_COLUMN);
}
return columns;
@ -290,7 +292,7 @@ export async function sampleForConnect(
ioConfig,
dataSchema: {
dataSource: 'sample',
timestampSpec: PLACEHOLDER_TIMESTAMP_SPEC,
timestampSpec: reingestMode ? REINDEX_TIMESTAMP_SPEC : PLACEHOLDER_TIMESTAMP_SPEC,
dimensionsSpec: {},
},
} as any,
@ -338,13 +340,15 @@ export async function sampleForParser(
sampleStrategy,
);
const reingestMode = isDruidSource(spec);
const sampleSpec: SampleSpec = {
type: samplerType,
spec: {
ioConfig,
dataSchema: {
dataSource: 'sample',
timestampSpec: PLACEHOLDER_TIMESTAMP_SPEC,
timestampSpec: reingestMode ? REINDEX_TIMESTAMP_SPEC : PLACEHOLDER_TIMESTAMP_SPEC,
dimensionsSpec: {},
},
},
@ -398,7 +402,7 @@ export async function sampleForTimestamp(
dimensionsSpec: {},
timestampSpec,
transformSpec: {
transforms: transforms.filter(transform => transform.name === '__time'),
transforms: transforms.filter(transform => transform.name === TIME_COLUMN),
},
},
},
@ -459,7 +463,7 @@ export async function sampleForTransform(
headerFromSampleResponse({
sampleResponse: sampleResponseHack,
ignoreTimeColumn: true,
columnOrder: ['__time'].concat(inputFormatColumns),
columnOrder: [TIME_COLUMN].concat(inputFormatColumns),
}).concat(transforms.map(t => t.name)),
);
}
@ -518,7 +522,7 @@ export async function sampleForFilter(
headerFromSampleResponse({
sampleResponse: sampleResponseHack,
ignoreTimeColumn: true,
columnOrder: ['__time'].concat(inputFormatColumns),
columnOrder: [TIME_COLUMN].concat(inputFormatColumns),
}).concat(transforms.map(t => t.name)),
);
}

View File

@ -55,6 +55,7 @@ import {
} from '../../components';
import { FormGroupWithInfo } from '../../components/form-group-with-info/form-group-with-info';
import { AsyncActionDialog } from '../../dialogs';
import { TIME_COLUMN } from '../../druid-models';
import {
addTimestampTransform,
adjustId,
@ -1221,8 +1222,8 @@ export class LoadDataView extends React.PureComponent<LoadDataViewProps, LoadDat
if (druidSource) {
let newSpec = deepSet(spec, 'spec.dataSchema.timestampSpec', {
column: '__time',
format: 'iso',
column: TIME_COLUMN,
format: 'millis',
});
if (typeof inputData.rollup === 'boolean') {
@ -1247,7 +1248,7 @@ export class LoadDataView extends React.PureComponent<LoadDataViewProps, LoadDat
newSpec,
'spec.dataSchema.dimensionsSpec.dimensions',
Object.keys(inputData.columns)
.filter(k => k !== '__time' && !aggregators[k])
.filter(k => k !== TIME_COLUMN && !aggregators[k])
.map(k => ({
name: k,
type: String(inputData.columns![k].type || 'string').toLowerCase(),
@ -1455,7 +1456,7 @@ export class LoadDataView extends React.PureComponent<LoadDataViewProps, LoadDat
let possibleTimestampSpec: TimestampSpec;
if (isDruidSource(spec)) {
possibleTimestampSpec = {
column: '__time',
column: TIME_COLUMN,
format: 'auto',
};
} else {
@ -1608,7 +1609,7 @@ export class LoadDataView extends React.PureComponent<LoadDataViewProps, LoadDat
data: {
headerAndRows: headerAndRowsFromSampleResponse({
sampleResponse,
columnOrder: ['__time'].concat(inputFormatColumns),
columnOrder: [TIME_COLUMN].concat(inputFormatColumns),
}),
spec,
},
@ -1802,7 +1803,7 @@ export class LoadDataView extends React.PureComponent<LoadDataViewProps, LoadDat
transformQueryState: new QueryState({
data: headerAndRowsFromSampleResponse({
sampleResponse,
columnOrder: ['__time'].concat(inputFormatColumns),
columnOrder: [TIME_COLUMN].concat(inputFormatColumns),
}),
lastData: transformQueryState.getSomeData(),
}),
@ -2020,7 +2021,7 @@ export class LoadDataView extends React.PureComponent<LoadDataViewProps, LoadDat
filterQueryState: new QueryState({
data: headerAndRowsFromSampleResponse({
sampleResponse,
columnOrder: ['__time'].concat(inputFormatColumns),
columnOrder: [TIME_COLUMN].concat(inputFormatColumns),
parsedOnly: true,
}),
lastData: filterQueryState.getSomeData(),
@ -2043,7 +2044,7 @@ export class LoadDataView extends React.PureComponent<LoadDataViewProps, LoadDat
const headerAndRowsNoFilter = headerAndRowsFromSampleResponse({
sampleResponse: sampleResponseNoFilter,
columnOrder: ['__time'].concat(inputFormatColumns),
columnOrder: [TIME_COLUMN].concat(inputFormatColumns),
parsedOnly: true,
});
@ -2232,7 +2233,9 @@ export class LoadDataView extends React.PureComponent<LoadDataViewProps, LoadDat
data: {
headerAndRows: headerAndRowsFromSampleResponse({
sampleResponse,
columnOrder: ['__time'].concat(dimensions ? dimensions.map(getDimensionSpecName) : []),
columnOrder: [TIME_COLUMN].concat(
dimensions ? dimensions.map(getDimensionSpecName) : [],
),
suffixColumnOrder: metricsSpec ? metricsSpec.map(getMetricSpecName) : undefined,
}),
dimensions,

View File

@ -1038,6 +1038,7 @@ baseDir
chatHandlerNumRetries
chatHandlerTimeout
connectorConfig
countryName
dataSchema's
foldCase
forceGuaranteedRollup
@ -1782,6 +1783,7 @@ successfulSending
taskBlackListCleanupPeriod
tasklogs
timeBoundary
timestampSpec
tmp
tmpfs
truststore