Add support segmentGranularity for CompactionTask (#6758)

* Add support segmentGranularity

* add doc and fix combination of options

* improve doc
This commit is contained in:
Jihoon Son 2019-01-03 17:50:45 -08:00 committed by Jonathan Wei
parent bc671ac436
commit 9ad6a733a5
8 changed files with 919 additions and 125 deletions

View File

@ -34,6 +34,7 @@ Compaction tasks merge all segments of the given interval. The syntax is:
"interval": <interval to specify segments to be merged>, "interval": <interval to specify segments to be merged>,
"dimensions" <custom dimensionsSpec>, "dimensions" <custom dimensionsSpec>,
"keepSegmentGranularity": <true or false>, "keepSegmentGranularity": <true or false>,
"segmentGranularity": <segment granularity after compaction>,
"targetCompactionSizeBytes": <target size of compacted segments> "targetCompactionSizeBytes": <target size of compacted segments>
"tuningConfig" <index task tuningConfig>, "tuningConfig" <index task tuningConfig>,
"context": <task context> "context": <task context>
@ -47,11 +48,23 @@ Compaction tasks merge all segments of the given interval. The syntax is:
|`dataSource`|DataSource name to be compacted|Yes| |`dataSource`|DataSource name to be compacted|Yes|
|`interval`|Interval of segments to be compacted|Yes| |`interval`|Interval of segments to be compacted|Yes|
|`dimensions`|Custom dimensionsSpec. compaction task will use this dimensionsSpec if exist instead of generating one. See below for more details.|No| |`dimensions`|Custom dimensionsSpec. compaction task will use this dimensionsSpec if exist instead of generating one. See below for more details.|No|
|`keepSegmentGranularity`|If set to true, compactionTask will keep the time chunk boundaries and merge segments only if they fall into the same time chunk.|No (default = true)| |`segmentGranularity`|If this is set, compactionTask will change the segment granularity for the given interval. See [segmentGranularity of Uniform Granularity Spec](./ingestion-spec.html#uniform-granularity-spec) for more details. See the below table for the behavior.|No|
|`keepSegmentGranularity`|Deprecated. Please use `segmentGranularity` instead. See the below table for its behavior.|No|
|`targetCompactionSizeBytes`|Target segment size after comapction. Cannot be used with `targetPartitionSize`, `maxTotalRows`, and `numShards` in tuningConfig.|No| |`targetCompactionSizeBytes`|Target segment size after comapction. Cannot be used with `targetPartitionSize`, `maxTotalRows`, and `numShards` in tuningConfig.|No|
|`tuningConfig`|[Index task tuningConfig](../ingestion/native_tasks.html#tuningconfig)|No| |`tuningConfig`|[Index task tuningConfig](../ingestion/native_tasks.html#tuningconfig)|No|
|`context`|[Task context](../ingestion/locking-and-priority.html#task-context)|No| |`context`|[Task context](../ingestion/locking-and-priority.html#task-context)|No|
### Used segmentGranularity based on `segmentGranularity` and `keepSegmentGranularity`
|SegmentGranularity|keepSegmentGranularity|Used SegmentGranularity|
|------------------|----------------------|-----------------------|
|Non-null|True|Error|
|Non-null|False|Given segmentGranularity|
|Non-null|Null|Given segmentGranularity|
|Null|True|Original segmentGranularity|
|Null|False|ALL segmentGranularity. All events will fall into the single time chunk.|
|Null|Null|Original segmentGranularity|
An example of compaction task is An example of compaction task is
```json ```json
@ -63,9 +76,9 @@ An example of compaction task is
``` ```
This compaction task reads _all segments_ of the interval `2017-01-01/2018-01-01` and results in new segments. This compaction task reads _all segments_ of the interval `2017-01-01/2018-01-01` and results in new segments.
Note that intervals of the input segments are merged into a single interval of `2017-01-01/2018-01-01` no matter what the segmentGranularity was. Since both `segmentGranularity` and `keepSegmentGranularity` are null, the original segment granularity will be remained and not changed after compaction.
To control the number of result segments, you can set `targetPartitionSize` or `numShards`. See [indexTuningConfig](../ingestion/native_tasks.html#tuningconfig) for more details. To control the number of result segments per time chunk, you can set `targetPartitionSize` or `numShards`. See [indexTuningConfig](../ingestion/native_tasks.html#tuningconfig) for more details.
To merge each day's worth of data into separate segments, you can submit multiple `compact` tasks, one for each day. They will run in parallel. Please note that you can run multiple compactionTasks at the same time. For example, you can run 12 compactionTasks per month instead of running a single task for the entire year.
A compaction task internally generates an `index` task spec for performing compaction work with some fixed parameters. A compaction task internally generates an `index` task spec for performing compaction work with some fixed parameters.
For example, its `firehose` is always the [ingestSegmentSpec](./firehose.html#ingestsegmentfirehose), and `dimensionsSpec` and `metricsSpec` For example, its `firehose` is always the [ingestSegmentSpec](./firehose.html#ingestsegmentfirehose), and `dimensionsSpec` and `metricsSpec`

View File

@ -271,7 +271,8 @@ for the `comment` column.
## GranularitySpec ## GranularitySpec
The default granularity spec is `uniform`, and can be changed by setting the `type` field. GranularitySpec is to define how to partition a dataSource into [time chunks](../design/index.html#datasources-and-segments).
The default granularitySpec is `uniform`, and can be changed by setting the `type` field.
Currently, `uniform` and `arbitrary` types are supported. Currently, `uniform` and `arbitrary` types are supported.
### Uniform Granularity Spec ### Uniform Granularity Spec
@ -280,8 +281,8 @@ This spec is used to generated segments with uniform intervals.
| Field | Type | Description | Required | | Field | Type | Description | Required |
|-------|------|-------------|----------| |-------|------|-------------|----------|
| segmentGranularity | string | The granularity to create segments at. | no (default == 'DAY') | | segmentGranularity | string | The granularity to create time chunks at. Multiple segments can be created per time chunk. For example, with 'DAY' `segmentGranularity`, the events of the same day fall into the same time chunk which can be optionally further partitioned into multiple segments based on other configurations and input size. See [Granularity](../querying/granularities.html) for supported granularities.| no (default == 'DAY') |
| queryGranularity | string | The minimum granularity to be able to query results at and the granularity of the data inside the segment. E.g. a value of "minute" will mean that data is aggregated at minutely granularity. That is, if there are collisions in the tuple (minute(timestamp), dimensions), then it will aggregate values together using the aggregators instead of storing individual rows. A granularity of 'NONE' means millisecond granularity.| no (default == 'NONE') | | queryGranularity | string | The minimum granularity to be able to query results at and the granularity of the data inside the segment. E.g. a value of "minute" will mean that data is aggregated at minutely granularity. That is, if there are collisions in the tuple (minute(timestamp), dimensions), then it will aggregate values together using the aggregators instead of storing individual rows. A granularity of 'NONE' means millisecond granularity. See [Granularity](../querying/granularities.html) for supported granularities.| no (default == 'NONE') |
| rollup | boolean | rollup or not | no (default == true) | | rollup | boolean | rollup or not | no (default == true) |
| intervals | string | A list of intervals for the raw data being ingested. Ignored for real-time ingestion. | no. If specified, batch ingestion tasks may skip determining partitions phase which results in faster ingestion. | | intervals | string | A list of intervals for the raw data being ingested. Ignored for real-time ingestion. | no. If specified, batch ingestion tasks may skip determining partitions phase which results in faster ingestion. |
@ -291,7 +292,7 @@ This spec is used to generate segments with arbitrary intervals (it tries to cre
| Field | Type | Description | Required | | Field | Type | Description | Required |
|-------|------|-------------|----------| |-------|------|-------------|----------|
| queryGranularity | string | The minimum granularity to be able to query results at and the granularity of the data inside the segment. E.g. a value of "minute" will mean that data is aggregated at minutely granularity. That is, if there are collisions in the tuple (minute(timestamp), dimensions), then it will aggregate values together using the aggregators instead of storing individual rows. A granularity of 'NONE' means millisecond granularity.| no (default == 'NONE') | | queryGranularity | string | The minimum granularity to be able to query results at and the granularity of the data inside the segment. E.g. a value of "minute" will mean that data is aggregated at minutely granularity. That is, if there are collisions in the tuple (minute(timestamp), dimensions), then it will aggregate values together using the aggregators instead of storing individual rows. A granularity of 'NONE' means millisecond granularity. See [Granularity](../querying/granularities.html) for supported granularities.| no (default == 'NONE') |
| rollup | boolean | rollup or not | no (default == true) | | rollup | boolean | rollup or not | no (default == true) |
| intervals | string | A list of intervals for the raw data being ingested. Ignored for real-time ingestion. | no. If specified, batch ingestion tasks may skip determining partitions phase which results in faster ingestion. | | intervals | string | A list of intervals for the raw data being ingested. Ignored for real-time ingestion. | no. If specified, batch ingestion tasks may skip determining partitions phase which results in faster ingestion. |

View File

@ -54,6 +54,8 @@ import org.apache.druid.java.util.common.JodaUtils;
import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.RE; import org.apache.druid.java.util.common.RE;
import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.granularity.GranularityType;
import org.apache.druid.java.util.common.guava.Comparators; import org.apache.druid.java.util.common.guava.Comparators;
import org.apache.druid.java.util.common.jackson.JacksonUtils; import org.apache.druid.java.util.common.jackson.JacksonUtils;
import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.java.util.common.logger.Logger;
@ -64,8 +66,8 @@ import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.column.ColumnHolder; import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.segment.column.ValueType; import org.apache.druid.segment.column.ValueType;
import org.apache.druid.segment.indexing.DataSchema; import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.granularity.ArbitraryGranularitySpec;
import org.apache.druid.segment.indexing.granularity.GranularitySpec; import org.apache.druid.segment.indexing.granularity.GranularitySpec;
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.druid.segment.loading.SegmentLoadingException; import org.apache.druid.segment.loading.SegmentLoadingException;
import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider; import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
import org.apache.druid.server.coordinator.DataSourceCompactionConfig; import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
@ -90,6 +92,7 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.SortedSet; import java.util.SortedSet;
import java.util.TreeMap;
import java.util.TreeSet; import java.util.TreeSet;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.IntStream; import java.util.stream.IntStream;
@ -99,12 +102,14 @@ public class CompactionTask extends AbstractTask
{ {
private static final Logger log = new Logger(CompactionTask.class); private static final Logger log = new Logger(CompactionTask.class);
private static final String TYPE = "compact"; private static final String TYPE = "compact";
private static final boolean DEFAULT_KEEP_SEGMENT_GRANULARITY = true;
private final Interval interval; private final Interval interval;
private final List<DataSegment> segments; private final List<DataSegment> segments;
private final DimensionsSpec dimensionsSpec; private final DimensionsSpec dimensionsSpec;
private final boolean keepSegmentGranularity; @Deprecated
@Nullable
private final Boolean keepSegmentGranularity;
private final Granularity segmentGranularity;
@Nullable @Nullable
private final Long targetCompactionSizeBytes; private final Long targetCompactionSizeBytes;
@Nullable @Nullable
@ -135,7 +140,8 @@ public class CompactionTask extends AbstractTask
@Nullable @JsonProperty("interval") final Interval interval, @Nullable @JsonProperty("interval") final Interval interval,
@Nullable @JsonProperty("segments") final List<DataSegment> segments, @Nullable @JsonProperty("segments") final List<DataSegment> segments,
@Nullable @JsonProperty("dimensions") final DimensionsSpec dimensionsSpec, @Nullable @JsonProperty("dimensions") final DimensionsSpec dimensionsSpec,
@Nullable @JsonProperty("keepSegmentGranularity") final Boolean keepSegmentGranularity, @Nullable @JsonProperty("keepSegmentGranularity") @Deprecated final Boolean keepSegmentGranularity,
@Nullable @JsonProperty("segmentGranularity") final Granularity segmentGranularity,
@Nullable @JsonProperty("targetCompactionSizeBytes") final Long targetCompactionSizeBytes, @Nullable @JsonProperty("targetCompactionSizeBytes") final Long targetCompactionSizeBytes,
@Nullable @JsonProperty("tuningConfig") final IndexTuningConfig tuningConfig, @Nullable @JsonProperty("tuningConfig") final IndexTuningConfig tuningConfig,
@Nullable @JsonProperty("context") final Map<String, Object> context, @Nullable @JsonProperty("context") final Map<String, Object> context,
@ -153,12 +159,19 @@ public class CompactionTask extends AbstractTask
throw new IAE("Interval[%s] is empty, must specify a nonempty interval", interval); throw new IAE("Interval[%s] is empty, must specify a nonempty interval", interval);
} }
if ((keepSegmentGranularity != null && keepSegmentGranularity) && segmentGranularity != null) {
throw new IAE("keepSegmentGranularity and segmentGranularity can't be used together");
}
if (keepSegmentGranularity != null) {
log.warn("keepSegmentGranularity is deprecated. Set a proper segmentGranularity instead");
}
this.interval = interval; this.interval = interval;
this.segments = segments; this.segments = segments;
this.dimensionsSpec = dimensionsSpec; this.dimensionsSpec = dimensionsSpec;
this.keepSegmentGranularity = keepSegmentGranularity == null this.keepSegmentGranularity = keepSegmentGranularity;
? DEFAULT_KEEP_SEGMENT_GRANULARITY this.segmentGranularity = segmentGranularity;
: keepSegmentGranularity;
this.targetCompactionSizeBytes = targetCompactionSizeBytes; this.targetCompactionSizeBytes = targetCompactionSizeBytes;
this.tuningConfig = tuningConfig; this.tuningConfig = tuningConfig;
this.jsonMapper = jsonMapper; this.jsonMapper = jsonMapper;
@ -188,11 +201,19 @@ public class CompactionTask extends AbstractTask
} }
@JsonProperty @JsonProperty
public boolean isKeepSegmentGranularity() @Deprecated
@Nullable
public Boolean isKeepSegmentGranularity()
{ {
return keepSegmentGranularity; return keepSegmentGranularity;
} }
@JsonProperty
public Granularity getSegmentGranularity()
{
return segmentGranularity;
}
@Nullable @Nullable
@JsonProperty @JsonProperty
public Long getTargetCompactionSizeBytes() public Long getTargetCompactionSizeBytes()
@ -243,6 +264,7 @@ public class CompactionTask extends AbstractTask
partitionConfigurationManager, partitionConfigurationManager,
dimensionsSpec, dimensionsSpec,
keepSegmentGranularity, keepSegmentGranularity,
segmentGranularity,
jsonMapper jsonMapper
).stream() ).stream()
.map(spec -> new IndexTask( .map(spec -> new IndexTask(
@ -300,7 +322,8 @@ public class CompactionTask extends AbstractTask
final SegmentProvider segmentProvider, final SegmentProvider segmentProvider,
final PartitionConfigurationManager partitionConfigurationManager, final PartitionConfigurationManager partitionConfigurationManager,
final DimensionsSpec dimensionsSpec, final DimensionsSpec dimensionsSpec,
final boolean keepSegmentGranularity, @Nullable final Boolean keepSegmentGranularity,
@Nullable final Granularity segmentGranularity,
final ObjectMapper jsonMapper final ObjectMapper jsonMapper
) throws IOException, SegmentLoadingException ) throws IOException, SegmentLoadingException
{ {
@ -326,62 +349,83 @@ public class CompactionTask extends AbstractTask
queryableIndexAndSegments queryableIndexAndSegments
); );
if (keepSegmentGranularity) { if (segmentGranularity == null) {
// If keepSegmentGranularity = true, create indexIngestionSpec per segment interval, so that we can run an index if (keepSegmentGranularity != null && !keepSegmentGranularity) {
// task per segment interval. // all granularity
//noinspection unchecked,ConstantConditions
final Map<Interval, List<Pair<QueryableIndex, DataSegment>>> intervalToSegments = queryableIndexAndSegments
.stream()
.collect(
Collectors.toMap(
// rhs can't be null here so we skip null checking and supress the warning with the above comment
p -> p.rhs.getInterval(),
Lists::newArrayList,
(l1, l2) -> {
l1.addAll(l2);
return l1;
}
)
);
final List<IndexIngestionSpec> specs = new ArrayList<>(intervalToSegments.size());
for (Entry<Interval, List<Pair<QueryableIndex, DataSegment>>> entry : intervalToSegments.entrySet()) {
final Interval interval = entry.getKey();
final List<Pair<QueryableIndex, DataSegment>> segmentsToCompact = entry.getValue();
final DataSchema dataSchema = createDataSchema( final DataSchema dataSchema = createDataSchema(
segmentProvider.dataSource, segmentProvider.dataSource,
interval, segmentProvider.interval,
segmentsToCompact, queryableIndexAndSegments,
dimensionsSpec, dimensionsSpec,
Granularities.ALL,
jsonMapper jsonMapper
); );
specs.add( return Collections.singletonList(
new IndexIngestionSpec( new IndexIngestionSpec(
dataSchema, dataSchema,
createIoConfig(toolbox, dataSchema, interval), createIoConfig(toolbox, dataSchema, segmentProvider.interval),
compactionTuningConfig
)
);
} else {
// original granularity
final Map<Interval, List<Pair<QueryableIndex, DataSegment>>> intervalToSegments = new TreeMap<>(
Comparators.intervalsByStartThenEnd()
);
//noinspection ConstantConditions
queryableIndexAndSegments.forEach(
p -> intervalToSegments.computeIfAbsent(p.rhs.getInterval(), k -> new ArrayList<>())
.add(p)
);
final List<IndexIngestionSpec> specs = new ArrayList<>(intervalToSegments.size());
for (Entry<Interval, List<Pair<QueryableIndex, DataSegment>>> entry : intervalToSegments.entrySet()) {
final Interval interval = entry.getKey();
final List<Pair<QueryableIndex, DataSegment>> segmentsToCompact = entry.getValue();
final DataSchema dataSchema = createDataSchema(
segmentProvider.dataSource,
interval,
segmentsToCompact,
dimensionsSpec,
GranularityType.fromPeriod(interval.toPeriod()).getDefaultGranularity(),
jsonMapper
);
specs.add(
new IndexIngestionSpec(
dataSchema,
createIoConfig(toolbox, dataSchema, interval),
compactionTuningConfig
)
);
}
return specs;
}
} else {
if (keepSegmentGranularity != null && keepSegmentGranularity) {
// error
throw new ISE("segmentGranularity[%s] and keepSegmentGranularity can't be used together", segmentGranularity);
} else {
// given segment granularity
final DataSchema dataSchema = createDataSchema(
segmentProvider.dataSource,
segmentProvider.interval,
queryableIndexAndSegments,
dimensionsSpec,
segmentGranularity,
jsonMapper
);
return Collections.singletonList(
new IndexIngestionSpec(
dataSchema,
createIoConfig(toolbox, dataSchema, segmentProvider.interval),
compactionTuningConfig compactionTuningConfig
) )
); );
} }
return specs;
} else {
final DataSchema dataSchema = createDataSchema(
segmentProvider.dataSource,
segmentProvider.interval,
queryableIndexAndSegments,
dimensionsSpec,
jsonMapper
);
return Collections.singletonList(
new IndexIngestionSpec(
dataSchema,
createIoConfig(toolbox, dataSchema, segmentProvider.interval),
compactionTuningConfig
)
);
} }
} }
@ -419,6 +463,7 @@ public class CompactionTask extends AbstractTask
Interval totalInterval, Interval totalInterval,
List<Pair<QueryableIndex, DataSegment>> queryableIndexAndSegments, List<Pair<QueryableIndex, DataSegment>> queryableIndexAndSegments,
DimensionsSpec dimensionsSpec, DimensionsSpec dimensionsSpec,
Granularity segmentGranularity,
ObjectMapper jsonMapper ObjectMapper jsonMapper
) )
{ {
@ -447,7 +492,8 @@ public class CompactionTask extends AbstractTask
return isRollup != null && isRollup; return isRollup != null && isRollup;
}); });
final GranularitySpec granularitySpec = new ArbitraryGranularitySpec( final GranularitySpec granularitySpec = new UniformGranularitySpec(
Preconditions.checkNotNull(segmentGranularity),
Granularities.NONE, Granularities.NONE,
rollup, rollup,
Collections.singletonList(totalInterval) Collections.singletonList(totalInterval)

View File

@ -0,0 +1,482 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.common.task;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.google.common.collect.ImmutableList;
import com.google.common.io.Files;
import org.apache.druid.data.input.impl.CSVParseSpec;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.ParseSpec;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.TestUtils;
import org.apache.druid.indexing.common.actions.LocalTaskActionClient;
import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.druid.segment.loading.DataSegmentPusher;
import org.apache.druid.segment.loading.LocalDataSegmentPuller;
import org.apache.druid.segment.loading.LocalDataSegmentPusher;
import org.apache.druid.segment.loading.LocalDataSegmentPusherConfig;
import org.apache.druid.segment.loading.LocalLoadSpec;
import org.apache.druid.segment.loading.NoopDataSegmentKiller;
import org.apache.druid.segment.loading.SegmentLoader;
import org.apache.druid.segment.loading.SegmentLoaderConfig;
import org.apache.druid.segment.loading.SegmentLoaderLocalCacheManager;
import org.apache.druid.segment.loading.StorageLocationConfig;
import org.apache.druid.server.security.AuthTestUtils;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.NumberedShardSpec;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.rules.TemporaryFolder;
import java.io.BufferedWriter;
import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutorService;
public class CompactionTaskRunTest extends IngestionTestBase
{
@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();
@Rule
public ExpectedException expectedException = ExpectedException.none();
private static final ParseSpec DEFAULT_PARSE_SPEC = new CSVParseSpec(
new TimestampSpec(
"ts",
"auto",
null
),
new DimensionsSpec(
DimensionsSpec.getDefaultSchemas(Arrays.asList("ts", "dim")),
Collections.emptyList(),
Collections.emptyList()
),
null,
Arrays.asList("ts", "dim", "val"),
false,
0
);
private RowIngestionMetersFactory rowIngestionMetersFactory;
private ExecutorService exec;
public CompactionTaskRunTest()
{
TestUtils testUtils = new TestUtils();
rowIngestionMetersFactory = testUtils.getRowIngestionMetersFactory();
}
@Before
public void setup()
{
exec = Execs.multiThreaded(2, "compaction-task-run-test-%d");
}
@After
public void teardown()
{
exec.shutdownNow();
}
@Test
public void testRun() throws Exception
{
runIndexTask();
final CompactionTask compactionTask = new CompactionTask(
null,
null,
DATA_SOURCE,
Intervals.of("2014-01-01/2014-01-02"),
null,
null,
null,
null,
null,
null,
null,
getObjectMapper(),
AuthTestUtils.TEST_AUTHORIZER_MAPPER,
null,
rowIngestionMetersFactory
);
final Pair<TaskStatus, List<DataSegment>> resultPair = runTask(compactionTask);
Assert.assertTrue(resultPair.lhs.isSuccess());
final List<DataSegment> segments = resultPair.rhs;
Assert.assertEquals(3, segments.size());
for (int i = 0; i < 3; i++) {
Assert.assertEquals(Intervals.of("2014-01-01T0%d:00:00/2014-01-01T0%d:00:00", i, i + 1), segments.get(i).getInterval());
Assert.assertEquals(new NumberedShardSpec(0, 0), segments.get(i).getShardSpec());
}
}
@Test
public void testRunCompactionTwiceWithoutKeepSegmentGranularity() throws Exception
{
runIndexTask();
final CompactionTask compactionTask1 = new CompactionTask(
null,
null,
DATA_SOURCE,
Intervals.of("2014-01-01/2014-01-02"),
null,
null,
false,
null,
null,
null,
null,
getObjectMapper(),
AuthTestUtils.TEST_AUTHORIZER_MAPPER,
null,
rowIngestionMetersFactory
);
Pair<TaskStatus, List<DataSegment>> resultPair = runTask(compactionTask1);
Assert.assertTrue(resultPair.lhs.isSuccess());
List<DataSegment> segments = resultPair.rhs;
Assert.assertEquals(1, segments.size());
Assert.assertEquals(Intervals.of("2014-01-01/2014-01-02"), segments.get(0).getInterval());
Assert.assertEquals(new NumberedShardSpec(0, 0), segments.get(0).getShardSpec());
final CompactionTask compactionTask2 = new CompactionTask(
null,
null,
DATA_SOURCE,
Intervals.of("2014-01-01/2014-01-02"),
null,
null,
false,
null,
null,
null,
null,
getObjectMapper(),
AuthTestUtils.TEST_AUTHORIZER_MAPPER,
null,
rowIngestionMetersFactory
);
resultPair = runTask(compactionTask2);
Assert.assertTrue(resultPair.lhs.isSuccess());
segments = resultPair.rhs;
Assert.assertEquals(1, segments.size());
Assert.assertEquals(Intervals.of("2014-01-01/2014-01-02"), segments.get(0).getInterval());
Assert.assertEquals(new NumberedShardSpec(0, 0), segments.get(0).getShardSpec());
}
@Test
public void testRunCompactionTwiceWithKeepSegmentGranularity() throws Exception
{
runIndexTask();
final CompactionTask compactionTask1 = new CompactionTask(
null,
null,
DATA_SOURCE,
Intervals.of("2014-01-01/2014-01-02"),
null,
null,
true,
null,
null,
null,
null,
getObjectMapper(),
AuthTestUtils.TEST_AUTHORIZER_MAPPER,
null,
rowIngestionMetersFactory
);
Pair<TaskStatus, List<DataSegment>> resultPair = runTask(compactionTask1);
Assert.assertTrue(resultPair.lhs.isSuccess());
List<DataSegment> segments = resultPair.rhs;
Assert.assertEquals(3, segments.size());
for (int i = 0; i < 3; i++) {
Assert.assertEquals(Intervals.of("2014-01-01T0%d:00:00/2014-01-01T0%d:00:00", i, i + 1), segments.get(i).getInterval());
Assert.assertEquals(new NumberedShardSpec(0, 0), segments.get(i).getShardSpec());
}
final CompactionTask compactionTask2 = new CompactionTask(
null,
null,
DATA_SOURCE,
Intervals.of("2014-01-01/2014-01-02"),
null,
null,
true,
null,
null,
null,
null,
getObjectMapper(),
AuthTestUtils.TEST_AUTHORIZER_MAPPER,
null,
rowIngestionMetersFactory
);
resultPair = runTask(compactionTask2);
Assert.assertTrue(resultPair.lhs.isSuccess());
segments = resultPair.rhs;
Assert.assertEquals(3, segments.size());
for (int i = 0; i < 3; i++) {
Assert.assertEquals(Intervals.of("2014-01-01T0%d:00:00/2014-01-01T0%d:00:00", i, i + 1), segments.get(i).getInterval());
Assert.assertEquals(new NumberedShardSpec(0, 0), segments.get(i).getShardSpec());
}
}
@Test
public void testWithSegmentGranularity() throws Exception
{
runIndexTask();
// day segmentGranularity
final CompactionTask compactionTask1 = new CompactionTask(
null,
null,
DATA_SOURCE,
Intervals.of("2014-01-01/2014-01-02"),
null,
null,
null,
Granularities.DAY,
null,
null,
null,
getObjectMapper(),
AuthTestUtils.TEST_AUTHORIZER_MAPPER,
null,
rowIngestionMetersFactory
);
Pair<TaskStatus, List<DataSegment>> resultPair = runTask(compactionTask1);
Assert.assertTrue(resultPair.lhs.isSuccess());
List<DataSegment> segments = resultPair.rhs;
Assert.assertEquals(1, segments.size());
Assert.assertEquals(Intervals.of("2014-01-01/2014-01-02"), segments.get(0).getInterval());
Assert.assertEquals(new NumberedShardSpec(0, 0), segments.get(0).getShardSpec());
// hour segmentGranularity
final CompactionTask compactionTask2 = new CompactionTask(
null,
null,
DATA_SOURCE,
Intervals.of("2014-01-01/2014-01-02"),
null,
null,
null,
Granularities.HOUR,
null,
null,
null,
getObjectMapper(),
AuthTestUtils.TEST_AUTHORIZER_MAPPER,
null,
rowIngestionMetersFactory
);
resultPair = runTask(compactionTask2);
Assert.assertTrue(resultPair.lhs.isSuccess());
segments = resultPair.rhs;
Assert.assertEquals(3, segments.size());
for (int i = 0; i < 3; i++) {
Assert.assertEquals(Intervals.of("2014-01-01T0%d:00:00/2014-01-01T0%d:00:00", i, i + 1), segments.get(i).getInterval());
Assert.assertEquals(new NumberedShardSpec(0, 0), segments.get(i).getShardSpec());
}
}
private Pair<TaskStatus, List<DataSegment>> runIndexTask() throws Exception
{
File tmpDir = temporaryFolder.newFolder();
File tmpFile = File.createTempFile("druid", "index", tmpDir);
try (BufferedWriter writer = Files.newWriter(tmpFile, StandardCharsets.UTF_8)) {
writer.write("2014-01-01T00:00:10Z,a,1\n");
writer.write("2014-01-01T00:00:10Z,b,2\n");
writer.write("2014-01-01T00:00:10Z,c,3\n");
writer.write("2014-01-01T01:00:20Z,a,1\n");
writer.write("2014-01-01T01:00:20Z,b,2\n");
writer.write("2014-01-01T01:00:20Z,c,3\n");
writer.write("2014-01-01T02:00:30Z,a,1\n");
writer.write("2014-01-01T02:00:30Z,b,2\n");
writer.write("2014-01-01T02:00:30Z,c,3\n");
}
IndexTask indexTask = new IndexTask(
null,
null,
createIngestionSpec(
tmpDir,
DEFAULT_PARSE_SPEC,
new UniformGranularitySpec(
Granularities.HOUR,
Granularities.MINUTE,
null
),
IndexTaskTest.createTuningConfig(2, 2, null, 2L, null, null, false, false, true),
false
),
null,
AuthTestUtils.TEST_AUTHORIZER_MAPPER,
null,
rowIngestionMetersFactory
);
return runTask(indexTask);
}
private Pair<TaskStatus, List<DataSegment>> runTask(Task task) throws Exception
{
getLockbox().add(task);
getTaskStorage().insert(task, TaskStatus.running(task.getId()));
final LocalTaskActionClient actionClient = createActionClient(task);
final File deepStorageDir = temporaryFolder.newFolder();
final ObjectMapper objectMapper = getObjectMapper();
objectMapper.registerSubtypes(
new NamedType(LocalLoadSpec.class, "local")
);
objectMapper.registerSubtypes(LocalDataSegmentPuller.class);
final List<DataSegment> segments = new ArrayList<>();
final DataSegmentPusher pusher = new LocalDataSegmentPusher(
new LocalDataSegmentPusherConfig()
{
@Override
public File getStorageDirectory()
{
return deepStorageDir;
}
},
objectMapper
)
{
@Override
public DataSegment push(File file, DataSegment segment, boolean useUniquePath) throws IOException
{
segments.add(segment);
return super.push(file, segment, useUniquePath);
}
};
final SegmentLoader loader = new SegmentLoaderLocalCacheManager(
getIndexIO(),
new SegmentLoaderConfig() {
@Override
public List<StorageLocationConfig> getLocations()
{
return ImmutableList.of(
new StorageLocationConfig()
{
@Override
public File getPath()
{
return deepStorageDir;
}
}
);
}
},
objectMapper
);
final TaskToolbox box = new TaskToolbox(
null,
actionClient,
null,
pusher,
new NoopDataSegmentKiller(),
null,
null,
null,
null,
null,
null,
null,
null,
loader,
objectMapper,
temporaryFolder.newFolder(),
getIndexIO(),
null,
null,
null,
getIndexMerger(),
null,
null,
null,
null,
new NoopTestTaskFileWriter()
);
if (task.isReady(box.getTaskActionClient())) {
TaskStatus status = task.run(box);
shutdownTask(task);
Collections.sort(segments);
return Pair.of(status, segments);
} else {
throw new ISE("task is not ready");
}
}
}

View File

@ -54,10 +54,13 @@ import org.apache.druid.indexing.common.task.IndexTask.IndexIngestionSpec;
import org.apache.druid.indexing.common.task.IndexTask.IndexTuningConfig; import org.apache.druid.indexing.common.task.IndexTask.IndexTuningConfig;
import org.apache.druid.indexing.firehose.IngestSegmentFirehoseFactory; import org.apache.druid.indexing.firehose.IngestSegmentFirehoseFactory;
import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.granularity.PeriodGranularity;
import org.apache.druid.java.util.common.guava.Comparators; import org.apache.druid.java.util.common.guava.Comparators;
import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.CountAggregatorFactory; import org.apache.druid.query.aggregation.CountAggregatorFactory;
@ -84,7 +87,7 @@ import org.apache.druid.segment.data.ListIndexed;
import org.apache.druid.segment.data.RoaringBitmapSerdeFactory; import org.apache.druid.segment.data.RoaringBitmapSerdeFactory;
import org.apache.druid.segment.incremental.IncrementalIndex; import org.apache.druid.segment.incremental.IncrementalIndex;
import org.apache.druid.segment.indexing.DataSchema; import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.granularity.ArbitraryGranularitySpec; import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.druid.segment.loading.SegmentLoadingException; import org.apache.druid.segment.loading.SegmentLoadingException;
import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider; import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
import org.apache.druid.segment.realtime.firehose.NoopChatHandlerProvider; import org.apache.druid.segment.realtime.firehose.NoopChatHandlerProvider;
@ -97,6 +100,7 @@ import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.NumberedShardSpec; import org.apache.druid.timeline.partition.NumberedShardSpec;
import org.hamcrest.CoreMatchers; import org.hamcrest.CoreMatchers;
import org.joda.time.Interval; import org.joda.time.Interval;
import org.joda.time.Period;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.BeforeClass; import org.junit.BeforeClass;
@ -130,26 +134,16 @@ public class CompactionTaskTest
private static final String DATA_SOURCE = "dataSource"; private static final String DATA_SOURCE = "dataSource";
private static final String TIMESTAMP_COLUMN = "timestamp"; private static final String TIMESTAMP_COLUMN = "timestamp";
private static final String MIXED_TYPE_COLUMN = "string_to_double"; private static final String MIXED_TYPE_COLUMN = "string_to_double";
private static final Interval COMPACTION_INTERVAL = Intervals.of("2017-01-01/2017-06-01"); private static final Interval COMPACTION_INTERVAL = Intervals.of("2017-01-01/2017-07-01");
private static final List<Interval> SEGMENT_INTERVALS = ImmutableList.of( private static final List<Interval> SEGMENT_INTERVALS = ImmutableList.of(
Intervals.of("2017-01-01/2017-02-01"), Intervals.of("2017-01-01/2017-02-01"),
Intervals.of("2017-02-01/2017-03-01"), Intervals.of("2017-02-01/2017-03-01"),
Intervals.of("2017-03-01/2017-04-01"), Intervals.of("2017-03-01/2017-04-01"),
Intervals.of("2017-04-01/2017-05-01"), Intervals.of("2017-04-01/2017-05-01"),
Intervals.of("2017-05-01/2017-06-01")
);
private static final Map<Interval, DimensionSchema> MIXED_TYPE_COLUMN_MAP = ImmutableMap.of(
Intervals.of("2017-01-01/2017-02-01"),
new StringDimensionSchema(MIXED_TYPE_COLUMN),
Intervals.of("2017-02-01/2017-03-01"),
new StringDimensionSchema(MIXED_TYPE_COLUMN),
Intervals.of("2017-03-01/2017-04-01"),
new StringDimensionSchema(MIXED_TYPE_COLUMN),
Intervals.of("2017-04-01/2017-05-01"),
new StringDimensionSchema(MIXED_TYPE_COLUMN),
Intervals.of("2017-05-01/2017-06-01"), Intervals.of("2017-05-01/2017-06-01"),
new DoubleDimensionSchema(MIXED_TYPE_COLUMN) Intervals.of("2017-06-01/2017-07-01")
); );
private static final Map<Interval, DimensionSchema> MIXED_TYPE_COLUMN_MAP = new HashMap<>();
private static final IndexTuningConfig TUNING_CONFIG = createTuningConfig(); private static final IndexTuningConfig TUNING_CONFIG = createTuningConfig();
private static Map<String, DimensionSchema> DIMENSIONS; private static Map<String, DimensionSchema> DIMENSIONS;
@ -166,12 +160,19 @@ public class CompactionTaskTest
@BeforeClass @BeforeClass
public static void setupClass() public static void setupClass()
{ {
MIXED_TYPE_COLUMN_MAP.put(Intervals.of("2017-01-01/2017-02-01"), new StringDimensionSchema(MIXED_TYPE_COLUMN));
MIXED_TYPE_COLUMN_MAP.put(Intervals.of("2017-02-01/2017-03-01"), new StringDimensionSchema(MIXED_TYPE_COLUMN));
MIXED_TYPE_COLUMN_MAP.put(Intervals.of("2017-03-01/2017-04-01"), new StringDimensionSchema(MIXED_TYPE_COLUMN));
MIXED_TYPE_COLUMN_MAP.put(Intervals.of("2017-04-01/2017-05-01"), new StringDimensionSchema(MIXED_TYPE_COLUMN));
MIXED_TYPE_COLUMN_MAP.put(Intervals.of("2017-05-01/2017-06-01"), new DoubleDimensionSchema(MIXED_TYPE_COLUMN));
MIXED_TYPE_COLUMN_MAP.put(Intervals.of("2017-06-01/2017-07-01"), new DoubleDimensionSchema(MIXED_TYPE_COLUMN));
DIMENSIONS = new HashMap<>(); DIMENSIONS = new HashMap<>();
AGGREGATORS = new HashMap<>(); AGGREGATORS = new HashMap<>();
DIMENSIONS.put(ColumnHolder.TIME_COLUMN_NAME, new LongDimensionSchema(ColumnHolder.TIME_COLUMN_NAME)); DIMENSIONS.put(ColumnHolder.TIME_COLUMN_NAME, new LongDimensionSchema(ColumnHolder.TIME_COLUMN_NAME));
DIMENSIONS.put(TIMESTAMP_COLUMN, new LongDimensionSchema(TIMESTAMP_COLUMN)); DIMENSIONS.put(TIMESTAMP_COLUMN, new LongDimensionSchema(TIMESTAMP_COLUMN));
for (int i = 0; i < 5; i++) { for (int i = 0; i < SEGMENT_INTERVALS.size(); i++) {
final StringDimensionSchema schema = new StringDimensionSchema( final StringDimensionSchema schema = new StringDimensionSchema(
"string_dim_" + i, "string_dim_" + i,
null, null,
@ -179,15 +180,15 @@ public class CompactionTaskTest
); );
DIMENSIONS.put(schema.getName(), schema); DIMENSIONS.put(schema.getName(), schema);
} }
for (int i = 0; i < 5; i++) { for (int i = 0; i < SEGMENT_INTERVALS.size(); i++) {
final LongDimensionSchema schema = new LongDimensionSchema("long_dim_" + i); final LongDimensionSchema schema = new LongDimensionSchema("long_dim_" + i);
DIMENSIONS.put(schema.getName(), schema); DIMENSIONS.put(schema.getName(), schema);
} }
for (int i = 0; i < 5; i++) { for (int i = 0; i < SEGMENT_INTERVALS.size(); i++) {
final FloatDimensionSchema schema = new FloatDimensionSchema("float_dim_" + i); final FloatDimensionSchema schema = new FloatDimensionSchema("float_dim_" + i);
DIMENSIONS.put(schema.getName(), schema); DIMENSIONS.put(schema.getName(), schema);
} }
for (int i = 0; i < 5; i++) { for (int i = 0; i < SEGMENT_INTERVALS.size(); i++) {
final DoubleDimensionSchema schema = new DoubleDimensionSchema("double_dim_" + i); final DoubleDimensionSchema schema = new DoubleDimensionSchema("double_dim_" + i);
DIMENSIONS.put(schema.getName(), schema); DIMENSIONS.put(schema.getName(), schema);
} }
@ -198,8 +199,8 @@ public class CompactionTaskTest
AGGREGATORS.put("agg_3", new FloatFirstAggregatorFactory("agg_3", "float_dim_3")); AGGREGATORS.put("agg_3", new FloatFirstAggregatorFactory("agg_3", "float_dim_3"));
AGGREGATORS.put("agg_4", new DoubleLastAggregatorFactory("agg_4", "double_dim_4")); AGGREGATORS.put("agg_4", new DoubleLastAggregatorFactory("agg_4", "double_dim_4"));
segmentMap = new HashMap<>(5); segmentMap = new HashMap<>(SEGMENT_INTERVALS.size());
for (int i = 0; i < 5; i++) { for (int i = 0; i < SEGMENT_INTERVALS.size(); i++) {
final Interval segmentInterval = Intervals.of(StringUtils.format("2017-0%d-01/2017-0%d-01", (i + 1), (i + 2))); final Interval segmentInterval = Intervals.of(StringUtils.format("2017-0%d-01/2017-0%d-01", (i + 1), (i + 2)));
segmentMap.put( segmentMap.put(
new DataSegment( new DataSegment(
@ -254,9 +255,9 @@ public class CompactionTaskTest
{ {
final List<String> dimensions = new ArrayList<>(); final List<String> dimensions = new ArrayList<>();
dimensions.add(TIMESTAMP_COLUMN); dimensions.add(TIMESTAMP_COLUMN);
for (int i = 0; i < 5; i++) { for (int i = 0; i < 6; i++) {
int postfix = i + startIndex; int postfix = i + startIndex;
postfix = postfix >= 5 ? postfix - 5 : postfix; postfix = postfix >= 6 ? postfix - 6 : postfix;
dimensions.add("string_dim_" + postfix); dimensions.add("string_dim_" + postfix);
dimensions.add("long_dim_" + postfix); dimensions.add("long_dim_" + postfix);
dimensions.add("float_dim_" + postfix); dimensions.add("float_dim_" + postfix);
@ -335,6 +336,7 @@ public class CompactionTaskTest
null, null,
null, null,
null, null,
null,
createTuningConfig(), createTuningConfig(),
ImmutableMap.of("testKey", "testContext"), ImmutableMap.of("testKey", "testContext"),
objectMapper, objectMapper,
@ -366,6 +368,7 @@ public class CompactionTaskTest
null, null,
null, null,
null, null,
null,
createTuningConfig(), createTuningConfig(),
ImmutableMap.of("testKey", "testContext"), ImmutableMap.of("testKey", "testContext"),
objectMapper, objectMapper,
@ -395,6 +398,7 @@ public class CompactionTaskTest
new PartitionConfigurationManager(null, TUNING_CONFIG), new PartitionConfigurationManager(null, TUNING_CONFIG),
null, null,
keepSegmentGranularity, keepSegmentGranularity,
null,
objectMapper objectMapper
); );
final List<DimensionsSpec> expectedDimensionsSpec = getExpectedDimensionsSpecForAutoGeneration( final List<DimensionsSpec> expectedDimensionsSpec = getExpectedDimensionsSpecForAutoGeneration(
@ -408,11 +412,16 @@ public class CompactionTaskTest
s2.getDataSchema().getGranularitySpec().inputIntervals().get(0) s2.getDataSchema().getGranularitySpec().inputIntervals().get(0)
) )
); );
Assert.assertEquals(5, ingestionSpecs.size()); Assert.assertEquals(6, ingestionSpecs.size());
assertIngestionSchema(ingestionSpecs, expectedDimensionsSpec, SEGMENT_INTERVALS); assertIngestionSchema(ingestionSpecs, expectedDimensionsSpec, SEGMENT_INTERVALS, Granularities.MONTH);
} else { } else {
Assert.assertEquals(1, ingestionSpecs.size()); Assert.assertEquals(1, ingestionSpecs.size());
assertIngestionSchema(ingestionSpecs, expectedDimensionsSpec, Collections.singletonList(COMPACTION_INTERVAL)); assertIngestionSchema(
ingestionSpecs,
expectedDimensionsSpec,
Collections.singletonList(COMPACTION_INTERVAL),
Granularities.ALL
);
} }
} }
@ -420,7 +429,7 @@ public class CompactionTaskTest
public void testCreateIngestionSchemaWithTargetPartitionSize() throws IOException, SegmentLoadingException public void testCreateIngestionSchemaWithTargetPartitionSize() throws IOException, SegmentLoadingException
{ {
final IndexTuningConfig tuningConfig = new IndexTuningConfig( final IndexTuningConfig tuningConfig = new IndexTuningConfig(
5, 6,
500000, 500000,
1000000L, 1000000L,
null, null,
@ -451,6 +460,7 @@ public class CompactionTaskTest
new PartitionConfigurationManager(null, tuningConfig), new PartitionConfigurationManager(null, tuningConfig),
null, null,
keepSegmentGranularity, keepSegmentGranularity,
null,
objectMapper objectMapper
); );
final List<DimensionsSpec> expectedDimensionsSpec = getExpectedDimensionsSpecForAutoGeneration( final List<DimensionsSpec> expectedDimensionsSpec = getExpectedDimensionsSpecForAutoGeneration(
@ -464,15 +474,22 @@ public class CompactionTaskTest
s2.getDataSchema().getGranularitySpec().inputIntervals().get(0) s2.getDataSchema().getGranularitySpec().inputIntervals().get(0)
) )
); );
Assert.assertEquals(5, ingestionSpecs.size()); Assert.assertEquals(6, ingestionSpecs.size());
assertIngestionSchema(ingestionSpecs, expectedDimensionsSpec, SEGMENT_INTERVALS, tuningConfig); assertIngestionSchema(
ingestionSpecs,
expectedDimensionsSpec,
SEGMENT_INTERVALS,
tuningConfig,
Granularities.MONTH
);
} else { } else {
Assert.assertEquals(1, ingestionSpecs.size()); Assert.assertEquals(1, ingestionSpecs.size());
assertIngestionSchema( assertIngestionSchema(
ingestionSpecs, ingestionSpecs,
expectedDimensionsSpec, expectedDimensionsSpec,
Collections.singletonList(COMPACTION_INTERVAL), Collections.singletonList(COMPACTION_INTERVAL),
tuningConfig tuningConfig,
Granularities.ALL
); );
} }
} }
@ -484,7 +501,7 @@ public class CompactionTaskTest
null, null,
500000, 500000,
1000000L, 1000000L,
5L, 6L,
null, null,
null, null,
null, null,
@ -512,6 +529,7 @@ public class CompactionTaskTest
new PartitionConfigurationManager(null, tuningConfig), new PartitionConfigurationManager(null, tuningConfig),
null, null,
keepSegmentGranularity, keepSegmentGranularity,
null,
objectMapper objectMapper
); );
final List<DimensionsSpec> expectedDimensionsSpec = getExpectedDimensionsSpecForAutoGeneration( final List<DimensionsSpec> expectedDimensionsSpec = getExpectedDimensionsSpecForAutoGeneration(
@ -525,15 +543,22 @@ public class CompactionTaskTest
s2.getDataSchema().getGranularitySpec().inputIntervals().get(0) s2.getDataSchema().getGranularitySpec().inputIntervals().get(0)
) )
); );
Assert.assertEquals(5, ingestionSpecs.size()); Assert.assertEquals(6, ingestionSpecs.size());
assertIngestionSchema(ingestionSpecs, expectedDimensionsSpec, SEGMENT_INTERVALS, tuningConfig); assertIngestionSchema(
ingestionSpecs,
expectedDimensionsSpec,
SEGMENT_INTERVALS,
tuningConfig,
Granularities.MONTH
);
} else { } else {
Assert.assertEquals(1, ingestionSpecs.size()); Assert.assertEquals(1, ingestionSpecs.size());
assertIngestionSchema( assertIngestionSchema(
ingestionSpecs, ingestionSpecs,
expectedDimensionsSpec, expectedDimensionsSpec,
Collections.singletonList(COMPACTION_INTERVAL), Collections.singletonList(COMPACTION_INTERVAL),
tuningConfig tuningConfig,
Granularities.ALL
); );
} }
} }
@ -573,6 +598,7 @@ public class CompactionTaskTest
new PartitionConfigurationManager(null, tuningConfig), new PartitionConfigurationManager(null, tuningConfig),
null, null,
keepSegmentGranularity, keepSegmentGranularity,
null,
objectMapper objectMapper
); );
final List<DimensionsSpec> expectedDimensionsSpec = getExpectedDimensionsSpecForAutoGeneration( final List<DimensionsSpec> expectedDimensionsSpec = getExpectedDimensionsSpecForAutoGeneration(
@ -586,15 +612,22 @@ public class CompactionTaskTest
s2.getDataSchema().getGranularitySpec().inputIntervals().get(0) s2.getDataSchema().getGranularitySpec().inputIntervals().get(0)
) )
); );
Assert.assertEquals(5, ingestionSpecs.size()); Assert.assertEquals(6, ingestionSpecs.size());
assertIngestionSchema(ingestionSpecs, expectedDimensionsSpec, SEGMENT_INTERVALS, tuningConfig); assertIngestionSchema(
ingestionSpecs,
expectedDimensionsSpec,
SEGMENT_INTERVALS,
tuningConfig,
Granularities.MONTH
);
} else { } else {
Assert.assertEquals(1, ingestionSpecs.size()); Assert.assertEquals(1, ingestionSpecs.size());
assertIngestionSchema( assertIngestionSchema(
ingestionSpecs, ingestionSpecs,
expectedDimensionsSpec, expectedDimensionsSpec,
Collections.singletonList(COMPACTION_INTERVAL), Collections.singletonList(COMPACTION_INTERVAL),
tuningConfig tuningConfig,
Granularities.ALL
); );
} }
} }
@ -635,6 +668,7 @@ public class CompactionTaskTest
new PartitionConfigurationManager(null, TUNING_CONFIG), new PartitionConfigurationManager(null, TUNING_CONFIG),
customSpec, customSpec,
keepSegmentGranularity, keepSegmentGranularity,
null,
objectMapper objectMapper
); );
@ -645,20 +679,22 @@ public class CompactionTaskTest
s2.getDataSchema().getGranularitySpec().inputIntervals().get(0) s2.getDataSchema().getGranularitySpec().inputIntervals().get(0)
) )
); );
Assert.assertEquals(5, ingestionSpecs.size()); Assert.assertEquals(6, ingestionSpecs.size());
final List<DimensionsSpec> dimensionsSpecs = new ArrayList<>(5); final List<DimensionsSpec> dimensionsSpecs = new ArrayList<>(6);
IntStream.range(0, 5).forEach(i -> dimensionsSpecs.add(customSpec)); IntStream.range(0, 6).forEach(i -> dimensionsSpecs.add(customSpec));
assertIngestionSchema( assertIngestionSchema(
ingestionSpecs, ingestionSpecs,
dimensionsSpecs, dimensionsSpecs,
SEGMENT_INTERVALS SEGMENT_INTERVALS,
Granularities.MONTH
); );
} else { } else {
Assert.assertEquals(1, ingestionSpecs.size()); Assert.assertEquals(1, ingestionSpecs.size());
assertIngestionSchema( assertIngestionSchema(
ingestionSpecs, ingestionSpecs,
Collections.singletonList(customSpec), Collections.singletonList(customSpec),
Collections.singletonList(COMPACTION_INTERVAL) Collections.singletonList(COMPACTION_INTERVAL),
Granularities.ALL
); );
} }
} }
@ -672,6 +708,7 @@ public class CompactionTaskTest
new PartitionConfigurationManager(null, TUNING_CONFIG), new PartitionConfigurationManager(null, TUNING_CONFIG),
null, null,
keepSegmentGranularity, keepSegmentGranularity,
null,
objectMapper objectMapper
); );
final List<DimensionsSpec> expectedDimensionsSpec = getExpectedDimensionsSpecForAutoGeneration( final List<DimensionsSpec> expectedDimensionsSpec = getExpectedDimensionsSpecForAutoGeneration(
@ -685,11 +722,16 @@ public class CompactionTaskTest
s2.getDataSchema().getGranularitySpec().inputIntervals().get(0) s2.getDataSchema().getGranularitySpec().inputIntervals().get(0)
) )
); );
Assert.assertEquals(5, ingestionSpecs.size()); Assert.assertEquals(6, ingestionSpecs.size());
assertIngestionSchema(ingestionSpecs, expectedDimensionsSpec, SEGMENT_INTERVALS); assertIngestionSchema(ingestionSpecs, expectedDimensionsSpec, SEGMENT_INTERVALS, Granularities.MONTH);
} else { } else {
Assert.assertEquals(1, ingestionSpecs.size()); Assert.assertEquals(1, ingestionSpecs.size());
assertIngestionSchema(ingestionSpecs, expectedDimensionsSpec, Collections.singletonList(COMPACTION_INTERVAL)); assertIngestionSchema(
ingestionSpecs,
expectedDimensionsSpec,
Collections.singletonList(COMPACTION_INTERVAL),
Granularities.ALL
);
} }
} }
@ -709,6 +751,7 @@ public class CompactionTaskTest
new PartitionConfigurationManager(null, TUNING_CONFIG), new PartitionConfigurationManager(null, TUNING_CONFIG),
null, null,
keepSegmentGranularity, keepSegmentGranularity,
null,
objectMapper objectMapper
); );
} }
@ -728,6 +771,7 @@ public class CompactionTaskTest
new PartitionConfigurationManager(null, TUNING_CONFIG), new PartitionConfigurationManager(null, TUNING_CONFIG),
null, null,
keepSegmentGranularity, keepSegmentGranularity,
null,
objectMapper objectMapper
); );
} }
@ -749,6 +793,7 @@ public class CompactionTaskTest
null, null,
null, null,
null, null,
null,
objectMapper, objectMapper,
AuthTestUtils.TEST_AUTHORIZER_MAPPER, AuthTestUtils.TEST_AUTHORIZER_MAPPER,
new NoopChatHandlerProvider(), new NoopChatHandlerProvider(),
@ -760,7 +805,7 @@ public class CompactionTaskTest
public void testTargetPartitionSizeWithPartitionConfig() throws IOException, SegmentLoadingException public void testTargetPartitionSizeWithPartitionConfig() throws IOException, SegmentLoadingException
{ {
final IndexTuningConfig tuningConfig = new IndexTuningConfig( final IndexTuningConfig tuningConfig = new IndexTuningConfig(
5, 6,
500000, 500000,
1000000L, 1000000L,
null, null,
@ -786,17 +831,135 @@ public class CompactionTaskTest
null null
); );
expectedException.expect(IllegalArgumentException.class); expectedException.expect(IllegalArgumentException.class);
expectedException.expectMessage("targetCompactionSizeBytes[5] cannot be used with"); expectedException.expectMessage("targetCompactionSizeBytes[6] cannot be used with");
final List<IndexIngestionSpec> ingestionSpecs = CompactionTask.createIngestionSchema( final List<IndexIngestionSpec> ingestionSpecs = CompactionTask.createIngestionSchema(
toolbox, toolbox,
new SegmentProvider(DATA_SOURCE, COMPACTION_INTERVAL), new SegmentProvider(DATA_SOURCE, COMPACTION_INTERVAL),
new PartitionConfigurationManager(5L, tuningConfig), new PartitionConfigurationManager(6L, tuningConfig),
null, null,
keepSegmentGranularity, keepSegmentGranularity,
null,
objectMapper objectMapper
); );
} }
@Test
public void testSegmentGranularity() throws IOException, SegmentLoadingException
{
final List<IndexIngestionSpec> ingestionSpecs = CompactionTask.createIngestionSchema(
toolbox,
new SegmentProvider(DATA_SOURCE, COMPACTION_INTERVAL),
new PartitionConfigurationManager(null, TUNING_CONFIG),
null,
null,
new PeriodGranularity(Period.months(3), null, null),
objectMapper
);
final List<DimensionsSpec> expectedDimensionsSpec = ImmutableList.of(
new DimensionsSpec(getDimensionSchema(new DoubleDimensionSchema("string_to_double")))
);
ingestionSpecs.sort(
(s1, s2) -> Comparators.intervalsByStartThenEnd().compare(
s1.getDataSchema().getGranularitySpec().inputIntervals().get(0),
s2.getDataSchema().getGranularitySpec().inputIntervals().get(0)
)
);
Assert.assertEquals(1, ingestionSpecs.size());
assertIngestionSchema(
ingestionSpecs,
expectedDimensionsSpec,
Collections.singletonList(COMPACTION_INTERVAL),
new PeriodGranularity(Period.months(3), null, null)
);
}
@Test
public void testSegmentGranularityWithFalseKeepSegmentGranularity() throws IOException, SegmentLoadingException
{
final List<IndexIngestionSpec> ingestionSpecs = CompactionTask.createIngestionSchema(
toolbox,
new SegmentProvider(DATA_SOURCE, COMPACTION_INTERVAL),
new PartitionConfigurationManager(null, TUNING_CONFIG),
null,
false,
new PeriodGranularity(Period.months(3), null, null),
objectMapper
);
final List<DimensionsSpec> expectedDimensionsSpec = ImmutableList.of(
new DimensionsSpec(getDimensionSchema(new DoubleDimensionSchema("string_to_double")))
);
ingestionSpecs.sort(
(s1, s2) -> Comparators.intervalsByStartThenEnd().compare(
s1.getDataSchema().getGranularitySpec().inputIntervals().get(0),
s2.getDataSchema().getGranularitySpec().inputIntervals().get(0)
)
);
Assert.assertEquals(1, ingestionSpecs.size());
assertIngestionSchema(
ingestionSpecs,
expectedDimensionsSpec,
Collections.singletonList(COMPACTION_INTERVAL),
new PeriodGranularity(Period.months(3), null, null)
);
}
@Test
public void testNullSegmentGranularityAndNullKeepSegmentGranularity() throws IOException, SegmentLoadingException
{
final List<IndexIngestionSpec> ingestionSpecs = CompactionTask.createIngestionSchema(
toolbox,
new SegmentProvider(DATA_SOURCE, COMPACTION_INTERVAL),
new PartitionConfigurationManager(null, TUNING_CONFIG),
null,
null,
null,
objectMapper
);
final List<DimensionsSpec> expectedDimensionsSpec = getExpectedDimensionsSpecForAutoGeneration(
true
);
ingestionSpecs.sort(
(s1, s2) -> Comparators.intervalsByStartThenEnd().compare(
s1.getDataSchema().getGranularitySpec().inputIntervals().get(0),
s2.getDataSchema().getGranularitySpec().inputIntervals().get(0)
)
);
Assert.assertEquals(6, ingestionSpecs.size());
assertIngestionSchema(
ingestionSpecs,
expectedDimensionsSpec,
SEGMENT_INTERVALS,
Granularities.MONTH
);
}
@Test
public void testUseKeepSegmentGranularityAndSegmentGranularityTogether()
{
expectedException.expect(IAE.class);
expectedException.expectMessage("keepSegmentGranularity and segmentGranularity can't be used together");
final CompactionTask task = new CompactionTask(
null,
null,
DATA_SOURCE,
COMPACTION_INTERVAL,
null,
null,
true,
Granularities.YEAR,
null,
createTuningConfig(),
ImmutableMap.of("testKey", "testContext"),
objectMapper,
AuthTestUtils.TEST_AUTHORIZER_MAPPER,
null,
rowIngestionMetersFactory
);
}
private static List<DimensionsSpec> getExpectedDimensionsSpecForAutoGeneration(boolean keepSegmentGranularity) private static List<DimensionsSpec> getExpectedDimensionsSpecForAutoGeneration(boolean keepSegmentGranularity)
{ {
if (keepSegmentGranularity) { if (keepSegmentGranularity) {
@ -805,6 +968,7 @@ public class CompactionTaskTest
new DimensionsSpec(getDimensionSchema(new StringDimensionSchema("string_to_double"))), new DimensionsSpec(getDimensionSchema(new StringDimensionSchema("string_to_double"))),
new DimensionsSpec(getDimensionSchema(new StringDimensionSchema("string_to_double"))), new DimensionsSpec(getDimensionSchema(new StringDimensionSchema("string_to_double"))),
new DimensionsSpec(getDimensionSchema(new StringDimensionSchema("string_to_double"))), new DimensionsSpec(getDimensionSchema(new StringDimensionSchema("string_to_double"))),
new DimensionsSpec(getDimensionSchema(new DoubleDimensionSchema("string_to_double"))),
new DimensionsSpec(getDimensionSchema(new DoubleDimensionSchema("string_to_double"))) new DimensionsSpec(getDimensionSchema(new DoubleDimensionSchema("string_to_double")))
); );
} else { } else {
@ -838,6 +1002,10 @@ public class CompactionTaskTest
new LongDimensionSchema("long_dim_3"), new LongDimensionSchema("long_dim_3"),
new FloatDimensionSchema("float_dim_3"), new FloatDimensionSchema("float_dim_3"),
new DoubleDimensionSchema("double_dim_3"), new DoubleDimensionSchema("double_dim_3"),
new StringDimensionSchema("string_dim_5"),
new LongDimensionSchema("long_dim_5"),
new FloatDimensionSchema("float_dim_5"),
new DoubleDimensionSchema("double_dim_5"),
mixedTypeColumn mixedTypeColumn
); );
} }
@ -845,7 +1013,8 @@ public class CompactionTaskTest
private static void assertIngestionSchema( private static void assertIngestionSchema(
List<IndexIngestionSpec> ingestionSchemas, List<IndexIngestionSpec> ingestionSchemas,
List<DimensionsSpec> expectedDimensionsSpecs, List<DimensionsSpec> expectedDimensionsSpecs,
List<Interval> expectedSegmentIntervals List<Interval> expectedSegmentIntervals,
Granularity expectedSegmentGranularity
) )
{ {
assertIngestionSchema( assertIngestionSchema(
@ -877,7 +1046,8 @@ public class CompactionTaskTest
null, null,
null, null,
null null
) ),
expectedSegmentGranularity
); );
} }
@ -885,7 +1055,8 @@ public class CompactionTaskTest
List<IndexIngestionSpec> ingestionSchemas, List<IndexIngestionSpec> ingestionSchemas,
List<DimensionsSpec> expectedDimensionsSpecs, List<DimensionsSpec> expectedDimensionsSpecs,
List<Interval> expectedSegmentIntervals, List<Interval> expectedSegmentIntervals,
IndexTuningConfig expectedTuningConfig IndexTuningConfig expectedTuningConfig,
Granularity expectedSegmentGranularity
) )
{ {
Preconditions.checkArgument( Preconditions.checkArgument(
@ -917,7 +1088,8 @@ public class CompactionTaskTest
.collect(Collectors.toSet()); .collect(Collectors.toSet());
Assert.assertEquals(expectedAggregators, new HashSet<>(Arrays.asList(dataSchema.getAggregators()))); Assert.assertEquals(expectedAggregators, new HashSet<>(Arrays.asList(dataSchema.getAggregators())));
Assert.assertEquals( Assert.assertEquals(
new ArbitraryGranularitySpec( new UniformGranularitySpec(
expectedSegmentGranularity,
Granularities.NONE, Granularities.NONE,
false, false,
Collections.singletonList(expectedSegmentIntervals.get(i)) Collections.singletonList(expectedSegmentIntervals.get(i))

View File

@ -1739,7 +1739,7 @@ public class IndexTaskTest
); );
} }
private static IndexTuningConfig createTuningConfig( static IndexTuningConfig createTuningConfig(
@Nullable Integer targetPartitionSize, @Nullable Integer targetPartitionSize,
@Nullable Integer maxRowsInMemory, @Nullable Integer maxRowsInMemory,
@Nullable Long maxBytesInMemory, @Nullable Long maxBytesInMemory,

View File

@ -20,40 +20,70 @@
package org.apache.druid.indexing.common.task; package org.apache.druid.indexing.common.task;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.druid.data.input.impl.ParseSpec;
import org.apache.druid.data.input.impl.StringInputRowParser;
import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.Counters; import org.apache.druid.indexing.common.Counters;
import org.apache.druid.indexing.common.TestUtils; import org.apache.druid.indexing.common.TestUtils;
import org.apache.druid.indexing.common.actions.LocalTaskActionClient; import org.apache.druid.indexing.common.actions.LocalTaskActionClient;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.actions.TaskActionToolbox; import org.apache.druid.indexing.common.actions.TaskActionToolbox;
import org.apache.druid.indexing.common.actions.TaskAuditLogConfig; import org.apache.druid.indexing.common.actions.TaskAuditLogConfig;
import org.apache.druid.indexing.common.config.TaskStorageConfig; import org.apache.druid.indexing.common.config.TaskStorageConfig;
import org.apache.druid.indexing.common.task.IndexTask.IndexTuningConfig;
import org.apache.druid.indexing.overlord.HeapMemoryTaskStorage; import org.apache.druid.indexing.overlord.HeapMemoryTaskStorage;
import org.apache.druid.indexing.overlord.TaskLockbox; import org.apache.druid.indexing.overlord.TaskLockbox;
import org.apache.druid.indexing.overlord.TaskStorage; import org.apache.druid.indexing.overlord.TaskStorage;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.metadata.EntryExistsException; import org.apache.druid.metadata.EntryExistsException;
import org.apache.druid.metadata.IndexerSQLMetadataStorageCoordinator; import org.apache.druid.metadata.IndexerSQLMetadataStorageCoordinator;
import org.apache.druid.metadata.SQLMetadataConnector;
import org.apache.druid.metadata.TestDerbyConnector; import org.apache.druid.metadata.TestDerbyConnector;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.segment.IndexIO; import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.IndexMergerV9; import org.apache.druid.segment.IndexMergerV9;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.granularity.GranularitySpec;
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.druid.segment.realtime.firehose.LocalFirehoseFactory;
import org.apache.druid.segment.transform.TransformSpec;
import org.apache.druid.server.metrics.NoopServiceEmitter; import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.junit.Before;
import org.junit.Rule; import org.junit.Rule;
import java.io.File;
import java.util.Collections;
import java.util.Map;
public abstract class IngestionTestBase public abstract class IngestionTestBase
{ {
public static final String DATA_SOURCE = "test";
@Rule @Rule
public final TestDerbyConnector.DerbyConnectorRule derbyConnectorRule = new TestDerbyConnector.DerbyConnectorRule(); public final TestDerbyConnector.DerbyConnectorRule derbyConnectorRule = new TestDerbyConnector.DerbyConnectorRule();
private final TestUtils testUtils = new TestUtils(); private final TestUtils testUtils = new TestUtils();
private final ObjectMapper objectMapper = testUtils.getTestObjectMapper(); private final ObjectMapper objectMapper = testUtils.getTestObjectMapper();
private final TaskStorage taskStorage = new HeapMemoryTaskStorage(new TaskStorageConfig(null)); private TaskStorage taskStorage;
private final TaskLockbox lockbox = new TaskLockbox(taskStorage); private IndexerSQLMetadataStorageCoordinator storageCoordinator;
private TaskLockbox lockbox;
public IngestionTestBase() @Before
public void setUp()
{ {
final SQLMetadataConnector connector = derbyConnectorRule.getConnector();
connector.createTaskTables();
taskStorage = new HeapMemoryTaskStorage(new TaskStorageConfig(null));
storageCoordinator = new IndexerSQLMetadataStorageCoordinator(
objectMapper,
derbyConnectorRule.metadataTablesConfigSupplier().get(),
derbyConnectorRule.getConnector()
);
lockbox = new TaskLockbox(taskStorage);
} }
public TaskActionClient createActionClient(Task task) public LocalTaskActionClient createActionClient(Task task)
{ {
return new LocalTaskActionClient(task, taskStorage, createTaskActionToolbox(), new TaskAuditLogConfig(false)); return new LocalTaskActionClient(task, taskStorage, createTaskActionToolbox(), new TaskAuditLogConfig(false));
} }
@ -64,6 +94,11 @@ public abstract class IngestionTestBase
taskStorage.insert(task, TaskStatus.running(task.getId())); taskStorage.insert(task, TaskStatus.running(task.getId()));
} }
public void shutdownTask(Task task)
{
lockbox.remove(task);
}
public ObjectMapper getObjectMapper() public ObjectMapper getObjectMapper()
{ {
return objectMapper; return objectMapper;
@ -81,11 +116,6 @@ public abstract class IngestionTestBase
public TaskActionToolbox createTaskActionToolbox() public TaskActionToolbox createTaskActionToolbox()
{ {
final IndexerSQLMetadataStorageCoordinator storageCoordinator = new IndexerSQLMetadataStorageCoordinator(
objectMapper,
derbyConnectorRule.metadataTablesConfigSupplier().get(),
derbyConnectorRule.getConnector()
);
storageCoordinator.start(); storageCoordinator.start();
return new TaskActionToolbox( return new TaskActionToolbox(
lockbox, lockbox,
@ -106,4 +136,54 @@ public abstract class IngestionTestBase
{ {
return testUtils.getTestIndexMergerV9(); return testUtils.getTestIndexMergerV9();
} }
public IndexTask.IndexIngestionSpec createIngestionSpec(
File baseDir,
ParseSpec parseSpec,
GranularitySpec granularitySpec,
IndexTuningConfig tuningConfig,
boolean appendToExisting
)
{
return createIngestionSpec(baseDir, parseSpec, TransformSpec.NONE, granularitySpec, tuningConfig, appendToExisting);
}
public IndexTask.IndexIngestionSpec createIngestionSpec(
File baseDir,
ParseSpec parseSpec,
TransformSpec transformSpec,
GranularitySpec granularitySpec,
IndexTuningConfig tuningConfig,
boolean appendToExisting
)
{
return new IndexTask.IndexIngestionSpec(
new DataSchema(
DATA_SOURCE,
objectMapper.convertValue(
new StringInputRowParser(parseSpec, null),
Map.class
),
new AggregatorFactory[]{
new LongSumAggregatorFactory("val", "val")
},
granularitySpec != null ? granularitySpec : new UniformGranularitySpec(
Granularities.DAY,
Granularities.MINUTE,
Collections.singletonList(Intervals.of("2014/2015"))
),
transformSpec,
objectMapper
),
new IndexTask.IndexIOConfig(
new LocalFirehoseFactory(
baseDir,
"druid*",
null
),
appendToExisting
),
tuningConfig
);
}
} }

View File

@ -143,7 +143,7 @@ public class UniformGranularitySpec implements GranularitySpec
UniformGranularitySpec that = (UniformGranularitySpec) o; UniformGranularitySpec that = (UniformGranularitySpec) o;
if (segmentGranularity != that.segmentGranularity) { if (!segmentGranularity.equals(that.segmentGranularity)) {
return false; return false;
} }
if (!queryGranularity.equals(that.queryGranularity)) { if (!queryGranularity.equals(that.queryGranularity)) {