Add compaction task (#4985)

* Add compaction task

* added doc

* use combining aggregators

* address comments

* add support for dimensionsSpec

* fix getUniqueDims and getUniqueMetics

* find unique dimensionsSpec

* fix compilation

* add unit test

* fix test

* fix test

* test for different dimension orderings and types, and doc for type and ordering

* add control for custom ordering and type

* update doc

* fix compile

* fix compile

* add segments param

* fix serde error

* fix build
This commit is contained in:
Jihoon Son 2017-11-04 12:55:27 +09:00 committed by Gian Merlino
parent 5eb08c27cb
commit 5f3c863d5e
45 changed files with 1751 additions and 348 deletions

View File

@ -29,6 +29,8 @@ import com.google.common.base.Preconditions;
import io.druid.guice.annotations.PublicApi;
import io.druid.java.util.common.StringUtils;
import java.util.Objects;
/**
*/
@PublicApi
@ -116,7 +118,7 @@ public abstract class DimensionSchema
protected DimensionSchema(String name, MultiValueHandling multiValueHandling)
{
this.name = Preconditions.checkNotNull(name, "Dimension name cannot be null.");
this.multiValueHandling = multiValueHandling;
this.multiValueHandling = multiValueHandling == null ? MultiValueHandling.ofDefault() : multiValueHandling;
}
@JsonProperty
@ -149,13 +151,30 @@ public abstract class DimensionSchema
DimensionSchema that = (DimensionSchema) o;
return name.equals(that.name);
if (!name.equals(that.name)) {
return false;
}
if (!getValueType().equals(that.getValueType())) {
return false;
}
return Objects.equals(multiValueHandling, that.multiValueHandling);
}
@Override
public int hashCode()
{
return name.hashCode();
return Objects.hash(name, getValueType(), multiValueHandling);
}
@Override
public String toString()
{
return "DimensionSchema{" +
"name='" + name + "'" +
", valueType='" + getValueType() + "'" +
", multiValueHandling='" + getMultiValueHandling() + "'" +
"}";
}
}

View File

@ -43,6 +43,7 @@ public class NoopInputRowParser implements InputRowParser<InputRow>
return input;
}
@JsonProperty
@Override
public ParseSpec getParseSpec()
{

View File

@ -21,7 +21,7 @@ package io.druid.data.input.impl;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import junit.framework.Assert;
import org.junit.Assert;
import org.junit.Test;
/**

View File

@ -79,7 +79,7 @@ A sample ingest firehose spec is shown below -
|interval|A String representing ISO-8601 Interval. This defines the time range to fetch the data over.|yes|
|dimensions|The list of dimensions to select. If left empty, no dimensions are returned. If left null or not defined, all dimensions are returned. |no|
|metrics|The list of metrics to select. If left empty, no metrics are returned. If left null or not defined, all metrics are selected.|no|
|filter| See [Filters](../querying/filters.html)|yes|
|filter| See [Filters](../querying/filters.html)|no|
#### CombiningFirehose

View File

@ -104,7 +104,7 @@ Tasks can have different default priorities depening on their types. Here are a
|---------|----------------|
|Realtime index task|75|
|Batch index task|50|
|Merge/Append task|25|
|Merge/Append/Compaction task|25|
|Other tasks|0|
You can override the task priority by setting your priority in the task context like below.
@ -184,19 +184,6 @@ On the contrary, in the incremental publishing mode, segments are incrementally
To enable bulk publishing mode, `forceGuaranteedRollup` should be set in the TuningConfig. Note that this option cannot be used with either `forceExtendableShardSpecs` of TuningConfig or `appendToExisting` of IOConfig.
### Task Context
The task context is used for various task configuration parameters. The following parameters apply to all tasks.
|property|default|description|
|--------|-------|-----------|
|taskLockTimeout|300000|task lock timeout in millisecond. For more details, see [the below Locking section](#locking).|
<div class="note caution">
When a task acquires a lock, it sends a request via HTTP and awaits until it receives a response containing the lock acquisition result.
As a result, an HTTP timeout error can occur if `taskLockTimeout` is greater than `druid.server.http.maxIdleTime` of overlords.
</div>
Segment Merging Tasks
---------------------
@ -210,7 +197,8 @@ Append tasks append a list of segments together into a single segment (one after
"id": <task_id>,
"dataSource": <task_datasource>,
"segments": <JSON list of DataSegment objects to append>,
"aggregations": <optional list of aggregators>
"aggregations": <optional list of aggregators>,
"context": <task context>
}
```
@ -228,7 +216,8 @@ The grammar is:
"dataSource": <task_datasource>,
"aggregations": <list of aggregators>,
"rollup": <whether or not to rollup data during a merge>,
"segments": <JSON list of DataSegment objects to merge>
"segments": <JSON list of DataSegment objects to merge>,
"context": <task context>
}
```
@ -245,10 +234,67 @@ The grammar is:
"dataSource": <task_datasource>,
"aggregations": <list of aggregators>,
"rollup": <whether or not to rollup data during a merge>,
"interval": <DataSegment objects in this interval are going to be merged>
"interval": <DataSegment objects in this interval are going to be merged>,
"context": <task context>
}
```
### Compaction Task
Compaction tasks merge all segments of the given interval. The syntax is:
```json
{
"type": "compact",
"id": <task_id>,
"dataSource": <task_datasource>,
"interval": <interval to specify segments to be merged>,
"dimensions" <custom dimensionsSpec>,
"tuningConfig" <index task tuningConfig>,
"context": <task context>
}
```
|Field|Description|Required|
|-----|-----------|--------|
|`type`|Task type. Should be `compact`|Yes|
|`id`|Task id|No|
|`dataSource`|dataSource name to be compacted|Yes|
|`interval`|interval of segments to be compacted|Yes|
|`dimensions`|custom dimensionsSpec. compaction task will use this dimensionsSpec if exist instead of generating one. See below for more details.|No|
|`tuningConfig`|[Index task tuningConfig](#tuningconfig)|No|
|`context`|[Task context](#taskcontext)|No|
An example of compaction task is
```json
{
"type" : "compact",
"dataSource" : "wikipedia",
"interval" : "2017-01-01/2018-01-01"
}
```
This compaction task merges _all segments_ of the interval `2017-01-01/2018-01-01` into a _single segment_.
To merge each day's worth of data into a separate segment, you can submit multiple `compact` tasks, one for each day. They will run in parallel.
A compaction task internally generates an `index` task spec for performing compaction work with some fixed parameters.
For example, its `firehose` is always the [ingestSegmentSpec](./firehose.html), and `dimensionsSpec` and `metricsSpec`
include all dimensions and metrics of the input segments by default.
The output segment can have different metadata from the input segments unless all input segments have the same metadata.
- Dimensions: since Druid supports schema change, the dimensions can be different across segments even if they are a part of the same dataSource.
If the input segments have different dimensions, the output segment basically includes all dimensions of the input segments.
However, even if the input segments have the same set of dimensions, the dimension order or the data type of dimensions can be different. For example, the data type of some dimensions can be
changed from `string` to primitive types, or the order of dimensions can be changed for better locality (See [Partitioning](batch-ingestion.html#partitioning-specification)).
In this case, the dimensions of recent segments precede that of old segments in terms of data types and the ordering.
This is because more recent segments are more likely to have the new desired order and data types. If you want to use
your own ordering and types, you can specify a custom `dimensionsSpec` in the compaction task spec.
- Roll-up: the output segment is rolled up only when `rollup` is set for all input segments.
See [Roll-up](../design/index.html#roll-up) for more details.
You can check that your segments are rolled up or not by using [Segment Metadata Queries](../querying/segmentmetadataquery.html#analysistypes).
Segment Destroying Tasks
------------------------
@ -261,7 +307,8 @@ Kill tasks delete all information about a segment and removes it from deep stora
"type": "kill",
"id": <task_id>,
"dataSource": <task_datasource>,
"interval" : <all_segments_in_this_interval_will_die!>
"interval" : <all_segments_in_this_interval_will_die!>,
"context": <task context>
}
```
@ -342,6 +389,21 @@ These tasks start, sleep for a time and are used only for testing. The available
}
```
Task Context
------------
The task context is used for various task configuration parameters. The following parameters apply to all task types.
|property|default|description|
|--------|-------|-----------|
|taskLockTimeout|300000|task lock timeout in millisecond. For more details, see [the below Locking section](#locking).|
|priority|Different based on task types. See [Task Priority](#task-priority).|Task priority|
<div class="note caution">
When a task acquires a lock, it sends a request via HTTP and awaits until it receives a response containing the lock acquisition result.
As a result, an HTTP timeout error can occur if `taskLockTimeout` is greater than `druid.server.http.maxIdleTime` of overlords.
</div>
Locking
-------

View File

@ -190,7 +190,7 @@ public class VarianceAggregatorFactory extends AggregatorFactory
@Override
public AggregatorFactory getMergingFactory(AggregatorFactory other) throws AggregatorFactoryNotMergeableException
{
if (Objects.equals(getName(), other.getName()) && this.getClass() == other.getClass()) {
if (Objects.equals(getName(), other.getName()) && other instanceof VarianceAggregatorFactory) {
return getCombiningFactory();
} else {
throw new AggregatorFactoryNotMergeableException(this, other);

View File

@ -33,7 +33,9 @@ import io.druid.query.Query;
import io.druid.query.QueryRunner;
import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@ -80,15 +82,27 @@ public abstract class AbstractTask implements Task
this.context = context;
}
public static String makeId(String id, final String typeName, String dataSource, Interval interval)
static String getOrMakeId(String id, final String typeName, String dataSource)
{
return id != null ? id : joinId(
typeName,
dataSource,
interval.getStart(),
interval.getEnd(),
DateTimes.nowUtc().toString()
);
return getOrMakeId(id, typeName, dataSource, null);
}
static String getOrMakeId(String id, final String typeName, String dataSource, @Nullable Interval interval)
{
if (id != null) {
return id;
}
final List<Object> objects = new ArrayList<>();
objects.add(typeName);
objects.add(dataSource);
if (interval != null) {
objects.add(interval.getStart());
objects.add(interval.getEnd());
}
objects.add(DateTimes.nowUtc().toString());
return joinId(objects);
}
@JsonProperty
@ -167,7 +181,12 @@ public abstract class AbstractTask implements Task
*
* @return string of joined objects
*/
public static String joinId(Object... objects)
static String joinId(List<Object> objects)
{
return ID_JOINER.join(objects);
}
static String joinId(Object...objects)
{
return ID_JOINER.join(objects);
}
@ -202,7 +221,7 @@ public abstract class AbstractTask implements Task
return id.hashCode();
}
protected List<TaskLock> getTaskLocks(TaskActionClient client) throws IOException
static List<TaskLock> getTaskLocks(TaskActionClient client) throws IOException
{
return client.submit(new LockListAction());
}

View File

@ -47,7 +47,7 @@ public class ArchiveTask extends AbstractFixedIntervalTask
)
{
super(
makeId(id, "archive", dataSource, interval),
getOrMakeId(id, "archive", dataSource, interval),
dataSource,
interval,
context

View File

@ -0,0 +1,495 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.indexing.common.task;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.BiMap;
import com.google.common.collect.HashBiMap;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.inject.Injector;
import io.druid.data.input.impl.DimensionSchema;
import io.druid.data.input.impl.DimensionSchema.MultiValueHandling;
import io.druid.data.input.impl.DimensionsSpec;
import io.druid.data.input.impl.DoubleDimensionSchema;
import io.druid.data.input.impl.FloatDimensionSchema;
import io.druid.data.input.impl.InputRowParser;
import io.druid.data.input.impl.LongDimensionSchema;
import io.druid.data.input.impl.NoopInputRowParser;
import io.druid.data.input.impl.StringDimensionSchema;
import io.druid.data.input.impl.TimeAndDimsParseSpec;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.TaskToolbox;
import io.druid.indexing.common.actions.SegmentListUsedAction;
import io.druid.indexing.common.actions.TaskActionClient;
import io.druid.indexing.common.task.IndexTask.IndexIOConfig;
import io.druid.indexing.common.task.IndexTask.IndexIngestionSpec;
import io.druid.indexing.common.task.IndexTask.IndexTuningConfig;
import io.druid.indexing.firehose.IngestSegmentFirehoseFactory;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.JodaUtils;
import io.druid.java.util.common.Pair;
import io.druid.java.util.common.granularity.NoneGranularity;
import io.druid.java.util.common.guava.Comparators;
import io.druid.java.util.common.jackson.JacksonUtils;
import io.druid.java.util.common.logger.Logger;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.segment.DimensionHandler;
import io.druid.segment.IndexIO;
import io.druid.segment.QueryableIndex;
import io.druid.segment.column.Column;
import io.druid.segment.column.ValueType;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.granularity.ArbitraryGranularitySpec;
import io.druid.segment.indexing.granularity.GranularitySpec;
import io.druid.segment.loading.SegmentLoadingException;
import io.druid.timeline.DataSegment;
import io.druid.timeline.TimelineObjectHolder;
import io.druid.timeline.VersionedIntervalTimeline;
import io.druid.timeline.partition.PartitionChunk;
import io.druid.timeline.partition.PartitionHolder;
import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
public class CompactionTask extends AbstractTask
{
private static final Logger log = new Logger(CompactionTask.class);
private static final String TYPE = "compact";
private final Interval interval;
private final List<DataSegment> segments;
private final DimensionsSpec dimensionsSpec;
private final IndexTuningConfig tuningConfig;
private final Injector injector;
private final ObjectMapper jsonMapper;
@JsonIgnore
private final SegmentProvider segmentProvider;
@JsonIgnore
private IndexTask indexTaskSpec;
@JsonCreator
public CompactionTask(
@JsonProperty("id") final String id,
@JsonProperty("resource") final TaskResource taskResource,
@JsonProperty("dataSource") final String dataSource,
@Nullable @JsonProperty("interval") final Interval interval,
@Nullable @JsonProperty("segments") final List<DataSegment> segments,
@Nullable @JsonProperty("dimensions") final DimensionsSpec dimensionsSpec,
@Nullable @JsonProperty("tuningConfig") final IndexTuningConfig tuningConfig,
@Nullable @JsonProperty("context") final Map<String, Object> context,
@JacksonInject Injector injector,
@JacksonInject ObjectMapper jsonMapper
)
{
super(getOrMakeId(id, TYPE, dataSource), null, taskResource, dataSource, context);
Preconditions.checkArgument(interval != null || segments != null, "interval or segments should be specified");
Preconditions.checkArgument(interval == null || segments == null, "one of interval and segments should be null");
this.interval = interval;
this.segments = segments;
this.dimensionsSpec = dimensionsSpec;
this.tuningConfig = tuningConfig;
this.injector = injector;
this.jsonMapper = jsonMapper;
this.segmentProvider = segments == null ? new SegmentProvider(dataSource, interval) : new SegmentProvider(segments);
}
@JsonProperty
public Interval getInterval()
{
return interval;
}
@JsonProperty
public List<DataSegment> getSegments()
{
return segments;
}
@JsonProperty
public DimensionsSpec getDimensionsSpec()
{
return dimensionsSpec;
}
@JsonProperty
public IndexTuningConfig getTuningConfig()
{
return tuningConfig;
}
@Override
public String getType()
{
return TYPE;
}
@Override
public int getPriority()
{
return getContextValue(Tasks.PRIORITY_KEY, Tasks.DEFAULT_MERGE_TASK_PRIORITY);
}
@VisibleForTesting
SegmentProvider getSegmentProvider()
{
return segmentProvider;
}
@Override
public boolean isReady(TaskActionClient taskActionClient) throws Exception
{
final SortedSet<Interval> intervals = new TreeSet<>(Comparators.intervalsByStartThenEnd());
intervals.add(segmentProvider.interval);
return IndexTask.isReady(taskActionClient, intervals);
}
@Override
public TaskStatus run(final TaskToolbox toolbox) throws Exception
{
if (indexTaskSpec == null) {
final IndexIngestionSpec ingestionSpec = createIngestionSchema(
toolbox,
segmentProvider,
dimensionsSpec,
tuningConfig,
injector,
jsonMapper
);
indexTaskSpec = new IndexTask(
getId(),
getGroupId(),
getTaskResource(),
getDataSource(),
ingestionSpec,
getContext()
);
}
if (indexTaskSpec.getIngestionSchema() == null) {
log.info("Cannot find segments for interval");
}
final String json = jsonMapper.writerWithDefaultPrettyPrinter().writeValueAsString(indexTaskSpec);
log.info("Generated compaction task details: " + json);
return indexTaskSpec.run(toolbox);
}
@VisibleForTesting
static IndexIngestionSpec createIngestionSchema(
TaskToolbox toolbox,
SegmentProvider segmentProvider,
DimensionsSpec dimensionsSpec,
IndexTuningConfig tuningConfig,
Injector injector,
ObjectMapper jsonMapper
) throws IOException, SegmentLoadingException
{
Pair<Map<DataSegment, File>, List<TimelineObjectHolder<String, DataSegment>>> pair = prepareSegments(
toolbox,
segmentProvider
);
final Map<DataSegment, File> segmentFileMap = pair.lhs;
final List<TimelineObjectHolder<String, DataSegment>> timelineSegments = pair.rhs;
if (timelineSegments.size() == 0) {
return null;
}
final DataSchema dataSchema = createDataSchema(
segmentProvider.dataSource,
segmentProvider.interval,
dimensionsSpec,
toolbox.getIndexIO(),
jsonMapper,
timelineSegments,
segmentFileMap
);
return new IndexIngestionSpec(
dataSchema,
new IndexIOConfig(
new IngestSegmentFirehoseFactory(
segmentProvider.dataSource,
segmentProvider.interval,
null, // no filter
// set dimensions and metrics names to make sure that the generated dataSchema is used for the firehose
dataSchema.getParser().getParseSpec().getDimensionsSpec().getDimensionNames(),
Arrays.stream(dataSchema.getAggregators()).map(AggregatorFactory::getName).collect(Collectors.toList()),
injector,
toolbox.getIndexIO()
),
false
),
tuningConfig
);
}
private static Pair<Map<DataSegment, File>, List<TimelineObjectHolder<String, DataSegment>>> prepareSegments(
TaskToolbox toolbox,
SegmentProvider segmentProvider
) throws IOException, SegmentLoadingException
{
final List<DataSegment> usedSegments = segmentProvider.checkAndGetSegments(toolbox);
final Map<DataSegment, File> segmentFileMap = toolbox.fetchSegments(usedSegments);
final List<TimelineObjectHolder<String, DataSegment>> timelineSegments = VersionedIntervalTimeline
.forSegments(usedSegments)
.lookup(segmentProvider.interval);
return Pair.of(segmentFileMap, timelineSegments);
}
private static DataSchema createDataSchema(
String dataSource,
Interval interval,
DimensionsSpec dimensionsSpec,
IndexIO indexIO,
ObjectMapper jsonMapper,
List<TimelineObjectHolder<String, DataSegment>> timelineSegments,
Map<DataSegment, File> segmentFileMap
)
throws IOException, SegmentLoadingException
{
// find metadata for interval
final List<QueryableIndex> queryableIndices = loadSegments(timelineSegments, segmentFileMap, indexIO);
// find merged aggregators
final List<AggregatorFactory[]> aggregatorFactories = queryableIndices
.stream()
.map(index -> index.getMetadata().getAggregators())
.collect(Collectors.toList());
final AggregatorFactory[] mergedAggregators = AggregatorFactory.mergeAggregators(aggregatorFactories);
if (mergedAggregators == null) {
throw new ISE("Failed to merge aggregators[%s]", aggregatorFactories);
}
// find granularity spec
// set rollup only if rollup is set for all segments
final boolean rollup = queryableIndices.stream().allMatch(index -> index.getMetadata().isRollup());
final GranularitySpec granularitySpec = new ArbitraryGranularitySpec(
new NoneGranularity(),
rollup,
ImmutableList.of(interval)
);
// find unique dimensions
final DimensionsSpec finalDimensionsSpec = dimensionsSpec == null ?
createDimensionsSpec(queryableIndices) :
dimensionsSpec;
final InputRowParser parser = new NoopInputRowParser(new TimeAndDimsParseSpec(null, finalDimensionsSpec));
return new DataSchema(
dataSource,
jsonMapper.convertValue(parser, JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT),
mergedAggregators,
granularitySpec,
null,
jsonMapper
);
}
private static DimensionsSpec createDimensionsSpec(List<QueryableIndex> queryableIndices)
{
final BiMap<String, Integer> uniqueDims = HashBiMap.create();
final Map<String, DimensionSchema> dimensionSchemaMap = new HashMap<>();
// Here, we try to retain the order of dimensions as they were specified since the order of dimensions may be
// optimized for performance.
// Dimensions are extracted from the recent segments to olders because recent segments are likely to be queried more
// frequently, and thus the performance should be optimized for recent ones rather than old ones.
// timelineSegments are sorted in order of interval
int index = 0;
for (QueryableIndex queryableIndex : Lists.reverse(queryableIndices)) {
final Map<String, DimensionHandler> dimensionHandlerMap = queryableIndex.getDimensionHandlers();
for (String dimension : queryableIndex.getAvailableDimensions()) {
final Column column = Preconditions.checkNotNull(
queryableIndex.getColumn(dimension),
"Cannot find column for dimension[%s]",
dimension
);
if (!uniqueDims.containsKey(dimension)) {
final DimensionHandler dimensionHandler = Preconditions.checkNotNull(
dimensionHandlerMap.get(dimension),
"Cannot find dimensionHandler for dimension[%s]",
dimension
);
uniqueDims.put(dimension, index++);
dimensionSchemaMap.put(
dimension,
createDimensionSchema(
column.getCapabilities().getType(),
dimension,
dimensionHandler.getMultivalueHandling()
)
);
}
}
}
final BiMap<Integer, String> orderedDims = uniqueDims.inverse();
final List<DimensionSchema> dimensionSchemas = IntStream.range(0, orderedDims.size())
.mapToObj(i -> {
final String dimName = orderedDims.get(i);
return Preconditions.checkNotNull(
dimensionSchemaMap.get(dimName),
"Cannot find dimension[%s] from dimensionSchemaMap",
dimName
);
})
.collect(Collectors.toList());
return new DimensionsSpec(dimensionSchemas, null, null);
}
private static List<QueryableIndex> loadSegments(
List<TimelineObjectHolder<String, DataSegment>> timelineSegments,
Map<DataSegment, File> segmentFileMap,
IndexIO indexIO
) throws IOException
{
final List<QueryableIndex> segments = new ArrayList<>();
for (TimelineObjectHolder<String, DataSegment> timelineSegment : timelineSegments) {
final PartitionHolder<DataSegment> partitionHolder = timelineSegment.getObject();
for (PartitionChunk<DataSegment> chunk : partitionHolder) {
final DataSegment segment = chunk.getObject();
segments.add(
indexIO.loadIndex(
Preconditions.checkNotNull(segmentFileMap.get(segment), "File for segment %s", segment.getIdentifier())
)
);
}
}
return segments;
}
private static DimensionSchema createDimensionSchema(
ValueType type,
String name,
MultiValueHandling multiValueHandling
)
{
switch (type) {
case FLOAT:
Preconditions.checkArgument(
multiValueHandling == null,
"multi-value dimension [%s] is not supported for float type yet",
name
);
return new FloatDimensionSchema(name);
case LONG:
Preconditions.checkArgument(
multiValueHandling == null,
"multi-value dimension [%s] is not supported for long type yet",
name
);
return new LongDimensionSchema(name);
case DOUBLE:
Preconditions.checkArgument(
multiValueHandling == null,
"multi-value dimension [%s] is not supported for double type yet",
name
);
return new DoubleDimensionSchema(name);
case STRING:
return new StringDimensionSchema(name, multiValueHandling);
default:
throw new ISE("Unsupported value type[%s] for dimension[%s]", type, name);
}
}
@VisibleForTesting
static class SegmentProvider
{
private final String dataSource;
private final Interval interval;
private final List<DataSegment> segments;
SegmentProvider(String dataSource, Interval interval)
{
this.dataSource = Preconditions.checkNotNull(dataSource);
this.interval = Preconditions.checkNotNull(interval);
this.segments = null;
}
SegmentProvider(List<DataSegment> segments)
{
Preconditions.checkArgument(segments != null && !segments.isEmpty());
final String dataSource = segments.get(0).getDataSource();
Preconditions.checkArgument(
segments.stream().allMatch(segment -> segment.getDataSource().equals(dataSource)),
"segments should have the same dataSource"
);
this.segments = segments;
this.dataSource = dataSource;
this.interval = JodaUtils.umbrellaInterval(
segments.stream().map(DataSegment::getInterval).collect(Collectors.toList())
);
}
List<DataSegment> getSegments()
{
return segments;
}
List<DataSegment> checkAndGetSegments(TaskToolbox toolbox) throws IOException
{
final List<DataSegment> usedSegments = toolbox.getTaskActionClient()
.submit(new SegmentListUsedAction(dataSource, interval, null));
if (segments != null) {
Collections.sort(usedSegments);
Collections.sort(segments);
Preconditions.checkState(
usedSegments.equals(segments),
"Specified segments[%s] are different from the currently used segments[%s]",
segments,
usedSegments
);
}
return usedSegments;
}
}
}

View File

@ -69,7 +69,7 @@ public class HadoopConverterTask extends ConvertSegmentTask
)
{
super(
makeId(
getOrMakeId(
id,
TYPE,
Preconditions.checkNotNull(dataSource, "dataSource"),

View File

@ -36,8 +36,6 @@ import com.google.common.collect.Iterables;
import com.google.common.hash.HashFunction;
import com.google.common.hash.Hashing;
import com.google.common.util.concurrent.ListenableFuture;
import io.druid.java.util.common.DateTimes;
import io.druid.java.util.common.JodaUtils;
import io.druid.data.input.Committer;
import io.druid.data.input.Firehose;
import io.druid.data.input.FirehoseFactory;
@ -53,6 +51,7 @@ import io.druid.indexing.common.actions.SegmentTransactionalInsertAction;
import io.druid.indexing.common.actions.TaskActionClient;
import io.druid.indexing.firehose.IngestSegmentFirehoseFactory;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.JodaUtils;
import io.druid.java.util.common.StringUtils;
import io.druid.java.util.common.granularity.Granularity;
import io.druid.java.util.common.guava.Comparators;
@ -97,6 +96,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeMap;
@ -115,31 +115,6 @@ public class IndexTask extends AbstractTask
private static final HashFunction hashFunction = Hashing.murmur3_128();
private static final String TYPE = "index";
private static String makeId(String id, IndexIngestionSpec ingestionSchema)
{
if (id != null) {
return id;
} else {
return StringUtils.format("index_%s_%s", makeDataSource(ingestionSchema), DateTimes.nowUtc());
}
}
private static String makeGroupId(IndexIngestionSpec ingestionSchema)
{
if (ingestionSchema.getIOConfig().isAppendToExisting()) {
// Shared locking group for all tasks that append, since they are OK to run concurrently.
return StringUtils.format("%s_append_%s", TYPE, ingestionSchema.getDataSchema().getDataSource());
} else {
// Return null, one locking group per task.
return null;
}
}
private static String makeDataSource(IndexIngestionSpec ingestionSchema)
{
return ingestionSchema.getDataSchema().getDataSource();
}
@JsonIgnore
private final IndexIngestionSpec ingestionSchema;
@ -151,17 +126,52 @@ public class IndexTask extends AbstractTask
@JsonProperty("context") final Map<String, Object> context
)
{
super(
makeId(id, ingestionSchema),
this(
id,
makeGroupId(ingestionSchema),
taskResource,
makeDataSource(ingestionSchema),
ingestionSchema.dataSchema.getDataSource(),
ingestionSchema,
context
);
}
IndexTask(
String id,
String groupId,
TaskResource resource,
String dataSource,
IndexIngestionSpec ingestionSchema,
Map<String, Object> context
)
{
super(
getOrMakeId(id, TYPE, dataSource),
groupId,
resource,
dataSource,
context
);
this.ingestionSchema = ingestionSchema;
}
private static String makeGroupId(IndexIngestionSpec ingestionSchema)
{
return makeGroupId(ingestionSchema.ioConfig.appendToExisting, ingestionSchema.dataSchema.getDataSource());
}
private static String makeGroupId(boolean isAppendToExisting, String dataSource)
{
if (isAppendToExisting) {
// Shared locking group for all tasks that append, since they are OK to run concurrently.
return StringUtils.format("%s_append_%s", TYPE, dataSource);
} else {
// Return null, one locking group per task.
return null;
}
}
@Override
public int getPriority()
{
@ -182,21 +192,26 @@ public class IndexTask extends AbstractTask
.bucketIntervals();
if (intervals.isPresent()) {
final List<TaskLock> locks = getTaskLocks(taskActionClient);
if (locks.size() == 0) {
try {
Tasks.tryAcquireExclusiveLocks(taskActionClient, intervals.get());
}
catch (Exception e) {
return false;
}
}
return true;
return isReady(taskActionClient, intervals.get());
} else {
return true;
}
}
static boolean isReady(TaskActionClient actionClient, SortedSet<Interval> intervals) throws IOException
{
final List<TaskLock> locks = getTaskLocks(actionClient);
if (locks.size() == 0) {
try {
Tasks.tryAcquireExclusiveLocks(actionClient, intervals);
}
catch (Exception e) {
return false;
}
}
return true;
}
@JsonProperty("spec")
public IndexIngestionSpec getIngestionSchema()
{
@ -1136,5 +1151,83 @@ public class IndexTask extends AbstractTask
{
return new Period(Integer.MAX_VALUE); // intermediate persist doesn't make much sense for batch jobs
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final IndexTuningConfig that = (IndexTuningConfig) o;
if (!Objects.equals(targetPartitionSize, that.targetPartitionSize)) {
return false;
}
if (maxRowsInMemory != that.maxRowsInMemory) {
return false;
}
if (maxTotalRows != that.maxTotalRows) {
return false;
}
if (!Objects.equals(numShards, that.numShards)) {
return false;
}
if (!Objects.equals(indexSpec, that.indexSpec)) {
return false;
}
if (!Objects.equals(basePersistDirectory, that.basePersistDirectory)) {
return false;
}
if (maxPendingPersists != that.maxPendingPersists) {
return false;
}
if (forceExtendableShardSpecs != that.forceExtendableShardSpecs) {
return false;
}
if (forceGuaranteedRollup != that.forceGuaranteedRollup) {
return false;
}
if (reportParseExceptions != that.reportParseExceptions) {
return false;
}
if (publishTimeout != that.publishTimeout) {
return false;
}
return true;
}
@Override
public int hashCode()
{
return Objects.hash(
targetPartitionSize,
maxRowsInMemory,
maxTotalRows,
numShards,
indexSpec,
basePersistDirectory,
maxPendingPersists,
forceExtendableShardSpecs,
forceGuaranteedRollup,
reportParseExceptions,
publishTimeout
);
}
}
}

View File

@ -52,7 +52,7 @@ public class KillTask extends AbstractFixedIntervalTask
)
{
super(
makeId(id, "kill", dataSource, interval),
getOrMakeId(id, "kill", dataSource, interval),
dataSource,
interval,
context

View File

@ -55,7 +55,7 @@ public class MoveTask extends AbstractFixedIntervalTask
)
{
super(
makeId(id, "move", dataSource, interval),
getOrMakeId(id, "move", dataSource, interval),
dataSource,
interval,
context

View File

@ -48,7 +48,7 @@ public class RestoreTask extends AbstractFixedIntervalTask
)
{
super(
makeId(id, "restore", dataSource, interval),
getOrMakeId(id, "restore", dataSource, interval),
dataSource,
interval,
context

View File

@ -60,7 +60,8 @@ import java.util.Map;
@JsonSubTypes.Type(name = "version_converter_sub", value = ConvertSegmentBackwardsCompatibleTask.SubTask.class), // backwards compat - Deprecated
@JsonSubTypes.Type(name = "convert_segment", value = ConvertSegmentTask.class),
@JsonSubTypes.Type(name = "convert_segment_sub", value = ConvertSegmentTask.SubTask.class),
@JsonSubTypes.Type(name = "same_interval_merge", value = SameIntervalMergeTask.class)
@JsonSubTypes.Type(name = "same_interval_merge", value = SameIntervalMergeTask.class),
@JsonSubTypes.Type(name = "compact", value = CompactionTask.class)
})
public interface Task
{

View File

@ -24,8 +24,6 @@ import io.druid.indexing.common.TaskLock;
import io.druid.indexing.common.TaskLockType;
import io.druid.indexing.common.actions.LockTryAcquireAction;
import io.druid.indexing.common.actions.TaskActionClient;
import io.druid.indexing.overlord.LockResult;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.JodaUtils;
import io.druid.java.util.common.guava.Comparators;
import org.joda.time.Interval;
@ -49,13 +47,6 @@ public class Tasks
public static final String PRIORITY_KEY = "priority";
public static final String LOCK_TIMEOUT_KEY = "taskLockTimeout";
public static void checkLockResult(LockResult result, Interval interval)
{
if (!result.isOk()) {
throw new ISE("Failed to lock for interval[%s]", interval);
}
}
public static Map<Interval, TaskLock> tryAcquireExclusiveLocks(TaskActionClient client, SortedSet<Interval> intervals)
throws IOException
{

View File

@ -22,12 +22,14 @@ package io.druid.indexing.firehose;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.BiMap;
import com.google.common.collect.HashBiMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.inject.Injector;
import com.metamx.emitter.EmittingLogger;
import io.druid.data.input.Firehose;
@ -37,7 +39,6 @@ import io.druid.indexing.common.TaskToolbox;
import io.druid.indexing.common.TaskToolboxFactory;
import io.druid.indexing.common.actions.SegmentListUsedAction;
import io.druid.indexing.common.task.NoopTask;
import io.druid.java.util.common.guava.Comparators;
import io.druid.java.util.common.parsers.ParseException;
import io.druid.query.filter.DimFilter;
import io.druid.segment.IndexIO;
@ -52,11 +53,14 @@ import io.druid.timeline.VersionedIntervalTimeline;
import io.druid.timeline.partition.PartitionChunk;
import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
public class IngestSegmentFirehoseFactory implements FirehoseFactory<InputRowParser>
{
@ -144,16 +148,9 @@ public class IngestSegmentFirehoseFactory implements FirehoseFactory<InputRowPar
.getTaskActionClient()
.submit(new SegmentListUsedAction(dataSource, interval, null));
final Map<DataSegment, File> segmentFileMap = taskToolbox.fetchSegments(usedSegments);
VersionedIntervalTimeline<String, DataSegment> timeline = new VersionedIntervalTimeline<>(
Comparators.naturalNullsFirst()
);
for (DataSegment segment : usedSegments) {
timeline.add(segment.getInterval(), segment.getVersion(), segment.getShardSpec().createChunk(segment));
}
final List<TimelineObjectHolder<String, DataSegment>> timeLineSegments = timeline.lookup(
interval
);
final List<TimelineObjectHolder<String, DataSegment>> timeLineSegments = VersionedIntervalTimeline
.forSegments(usedSegments)
.lookup(interval);
final List<String> dims;
if (dimensions != null) {
@ -161,83 +158,13 @@ public class IngestSegmentFirehoseFactory implements FirehoseFactory<InputRowPar
} else if (inputRowParser.getParseSpec().getDimensionsSpec().hasCustomDimensions()) {
dims = inputRowParser.getParseSpec().getDimensionsSpec().getDimensionNames();
} else {
Set<String> dimSet = Sets.newHashSet(
Iterables.concat(
Iterables.transform(
timeLineSegments,
new Function<TimelineObjectHolder<String, DataSegment>, Iterable<String>>()
{
@Override
public Iterable<String> apply(
TimelineObjectHolder<String, DataSegment> timelineObjectHolder
)
{
return Iterables.concat(
Iterables.transform(
timelineObjectHolder.getObject(),
new Function<PartitionChunk<DataSegment>, Iterable<String>>()
{
@Override
public Iterable<String> apply(PartitionChunk<DataSegment> input)
{
return input.getObject().getDimensions();
}
}
)
);
}
}
)
)
);
dims = Lists.newArrayList(
Sets.difference(
dimSet,
inputRowParser
.getParseSpec()
.getDimensionsSpec()
.getDimensionExclusions()
)
dims = getUniqueDimensions(
timeLineSegments,
inputRowParser.getParseSpec().getDimensionsSpec().getDimensionExclusions()
);
}
final List<String> metricsList;
if (metrics != null) {
metricsList = metrics;
} else {
Set<String> metricsSet = Sets.newHashSet(
Iterables.concat(
Iterables.transform(
timeLineSegments,
new Function<TimelineObjectHolder<String, DataSegment>, Iterable<String>>()
{
@Override
public Iterable<String> apply(
TimelineObjectHolder<String, DataSegment> input
)
{
return Iterables.concat(
Iterables.transform(
input.getObject(),
new Function<PartitionChunk<DataSegment>, Iterable<String>>()
{
@Override
public Iterable<String> apply(PartitionChunk<DataSegment> input)
{
return input.getObject().getMetrics();
}
}
)
);
}
}
)
)
);
metricsList = Lists.newArrayList(metricsSet);
}
final List<String> metricsList = metrics == null ? getUniqueMetrics(timeLineSegments) : metrics;
final List<WindowedStorageAdapter> adapters = Lists.newArrayList(
Iterables.concat(
@ -288,6 +215,63 @@ public class IngestSegmentFirehoseFactory implements FirehoseFactory<InputRowPar
catch (IOException | SegmentLoadingException e) {
throw Throwables.propagate(e);
}
}
@VisibleForTesting
static List<String> getUniqueDimensions(
List<TimelineObjectHolder<String, DataSegment>> timelineSegments,
@Nullable Set<String> excludeDimensions
)
{
final BiMap<String, Integer> uniqueDims = HashBiMap.create();
// Here, we try to retain the order of dimensions as they were specified since the order of dimensions may be
// optimized for performance.
// Dimensions are extracted from the recent segments to olders because recent segments are likely to be queried more
// frequently, and thus the performance should be optimized for recent ones rather than old ones.
// timelineSegments are sorted in order of interval
int index = 0;
for (TimelineObjectHolder<String, DataSegment> timelineHolder : Lists.reverse(timelineSegments)) {
for (PartitionChunk<DataSegment> chunk : timelineHolder.getObject()) {
for (String dimension : chunk.getObject().getDimensions()) {
if (!uniqueDims.containsKey(dimension) &&
(excludeDimensions == null || !excludeDimensions.contains(dimension))) {
uniqueDims.put(dimension, index++);
}
}
}
}
final BiMap<Integer, String> orderedDims = uniqueDims.inverse();
return IntStream.range(0, orderedDims.size())
.mapToObj(orderedDims::get)
.collect(Collectors.toList());
}
@VisibleForTesting
static List<String> getUniqueMetrics(List<TimelineObjectHolder<String, DataSegment>> timelineSegments)
{
final BiMap<String, Integer> uniqueMetrics = HashBiMap.create();
// Here, we try to retain the order of metrics as they were specified. Metrics are extracted from the recent
// segments to olders.
// timelineSegments are sorted in order of interval
int index = 0;
for (TimelineObjectHolder<String, DataSegment> timelineHolder : Lists.reverse(timelineSegments)) {
for (PartitionChunk<DataSegment> chunk : timelineHolder.getObject()) {
for (String metric : chunk.getObject().getMetrics()) {
if (!uniqueMetrics.containsKey(metric)) {
uniqueMetrics.put(metric, index++);
}
}
}
}
final BiMap<Integer, String> orderedMetrics = uniqueMetrics.inverse();
return IntStream.range(0, orderedMetrics.size())
.mapToObj(orderedMetrics::get)
.collect(Collectors.toList());
}
}

View File

@ -53,7 +53,7 @@ public class TestUtils
public TestUtils()
{
jsonMapper = new DefaultObjectMapper();
this.jsonMapper = new DefaultObjectMapper();
indexIO = new IndexIO(
jsonMapper,
new ColumnConfig()

View File

@ -0,0 +1,624 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.indexing.common.task;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.introspect.AnnotationIntrospectorPair;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.inject.Injector;
import io.druid.data.input.FirehoseFactory;
import io.druid.data.input.impl.DimensionSchema;
import io.druid.data.input.impl.DimensionsSpec;
import io.druid.data.input.impl.DoubleDimensionSchema;
import io.druid.data.input.impl.FloatDimensionSchema;
import io.druid.data.input.impl.InputRowParser;
import io.druid.data.input.impl.LongDimensionSchema;
import io.druid.data.input.impl.NoopInputRowParser;
import io.druid.data.input.impl.StringDimensionSchema;
import io.druid.data.input.impl.TimeAndDimsParseSpec;
import io.druid.guice.GuiceAnnotationIntrospector;
import io.druid.guice.GuiceInjectableValues;
import io.druid.guice.GuiceInjectors;
import io.druid.indexing.common.TaskToolbox;
import io.druid.indexing.common.actions.SegmentListUsedAction;
import io.druid.indexing.common.actions.TaskAction;
import io.druid.indexing.common.actions.TaskActionClient;
import io.druid.indexing.common.task.CompactionTask.SegmentProvider;
import io.druid.indexing.common.task.IndexTask.IndexIOConfig;
import io.druid.indexing.common.task.IndexTask.IndexIngestionSpec;
import io.druid.indexing.common.task.IndexTask.IndexTuningConfig;
import io.druid.indexing.firehose.IngestSegmentFirehoseFactory;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.Intervals;
import io.druid.java.util.common.StringUtils;
import io.druid.java.util.common.granularity.Granularities;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory;
import io.druid.query.aggregation.LongMaxAggregatorFactory;
import io.druid.query.aggregation.LongSumAggregatorFactory;
import io.druid.query.aggregation.first.FloatFirstAggregatorFactory;
import io.druid.query.aggregation.last.DoubleLastAggregatorFactory;
import io.druid.segment.IndexIO;
import io.druid.segment.IndexMergerV9;
import io.druid.segment.IndexSpec;
import io.druid.segment.Metadata;
import io.druid.segment.QueryableIndex;
import io.druid.segment.SimpleQueryableIndex;
import io.druid.segment.column.Column;
import io.druid.segment.column.ColumnBuilder;
import io.druid.segment.column.ValueType;
import io.druid.segment.data.CompressedObjectStrategy.CompressionStrategy;
import io.druid.segment.data.CompressionFactory.LongEncodingStrategy;
import io.druid.segment.data.ListIndexed;
import io.druid.segment.data.RoaringBitmapSerdeFactory;
import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.granularity.ArbitraryGranularitySpec;
import io.druid.segment.loading.SegmentLoadingException;
import io.druid.segment.transform.TransformingInputRowParser;
import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.NumberedShardSpec;
import org.hamcrest.CoreMatchers;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.stream.Collectors;
public class CompactionTaskTest
{
private static final String DATA_SOURCE = "dataSource";
private static final String TIMESTAMP_COLUMN = "timestamp";
private static final String MIXED_TYPE_COLUMN = "string_to_double";
private static final Interval COMPACTION_INTERVAL = Intervals.of("2017-01-01/2017-06-01");
private static final Map<Interval, DimensionSchema> MIXED_TYPE_COLUMN_MAP = ImmutableMap.of(
Intervals.of("2017-01-01/2017-02-01"),
new StringDimensionSchema(MIXED_TYPE_COLUMN, null),
Intervals.of("2017-02-01/2017-03-01"),
new StringDimensionSchema(MIXED_TYPE_COLUMN, null),
Intervals.of("2017-03-01/2017-04-01"),
new StringDimensionSchema(MIXED_TYPE_COLUMN, null),
Intervals.of("2017-04-01/2017-05-01"),
new StringDimensionSchema(MIXED_TYPE_COLUMN, null),
Intervals.of("2017-05-01/2017-06-01"),
new DoubleDimensionSchema(MIXED_TYPE_COLUMN)
);
private static final IndexTuningConfig TUNING_CONFIG = createTuningConfig();
private static final Injector INJECTOR = GuiceInjectors.makeStartupInjector();
private static Map<String, DimensionSchema> DIMENSIONS;
private static Map<String, AggregatorFactory> AGGREGATORS;
private static List<DataSegment> SEGMENTS;
private static ObjectMapper objectMapper = setupInjectablesInObjectMapper(new DefaultObjectMapper());
private static TaskToolbox toolbox;
@BeforeClass
public static void setup()
{
DIMENSIONS = new HashMap<>();
AGGREGATORS = new HashMap<>();
DIMENSIONS.put(Column.TIME_COLUMN_NAME, new LongDimensionSchema(Column.TIME_COLUMN_NAME));
DIMENSIONS.put(TIMESTAMP_COLUMN, new LongDimensionSchema(TIMESTAMP_COLUMN));
for (int i = 0; i < 5; i++) {
final StringDimensionSchema schema = new StringDimensionSchema(
"string_dim_" + i,
null
);
DIMENSIONS.put(schema.getName(), schema);
}
for (int i = 0; i < 5; i++) {
final LongDimensionSchema schema = new LongDimensionSchema("long_dim_" + i);
DIMENSIONS.put(schema.getName(), schema);
}
for (int i = 0; i < 5; i++) {
final FloatDimensionSchema schema = new FloatDimensionSchema("float_dim_" + i);
DIMENSIONS.put(schema.getName(), schema);
}
for (int i = 0; i < 5; i++) {
final DoubleDimensionSchema schema = new DoubleDimensionSchema("double_dim_" + i);
DIMENSIONS.put(schema.getName(), schema);
}
AGGREGATORS.put("agg_0", new CountAggregatorFactory("agg_0"));
AGGREGATORS.put("agg_1", new LongSumAggregatorFactory("agg_1", "long_dim_1"));
AGGREGATORS.put("agg_2", new LongMaxAggregatorFactory("agg_2", "long_dim_2"));
AGGREGATORS.put("agg_3", new FloatFirstAggregatorFactory("agg_3", "float_dim_3"));
AGGREGATORS.put("agg_4", new DoubleLastAggregatorFactory("agg_4", "double_dim_4"));
final Map<DataSegment, File> segmentMap = new HashMap<>(5);
for (int i = 0; i < 5; i++) {
final Interval segmentInterval = Intervals.of(StringUtils.format("2017-0%d-01/2017-0%d-01", (i + 1), (i + 2)));
segmentMap.put(
new DataSegment(
DATA_SOURCE,
segmentInterval,
"version",
ImmutableMap.of(),
findDimensions(i, segmentInterval),
new ArrayList<>(AGGREGATORS.keySet()),
new NumberedShardSpec(0, 1),
0,
1
),
new File("file_" + i)
);
}
SEGMENTS = new ArrayList<>(segmentMap.keySet());
toolbox = new TestTaskToolbox(
new TestTaskActionClient(new ArrayList<>(segmentMap.keySet())),
new TestIndexIO(objectMapper, segmentMap),
segmentMap
);
}
private static ObjectMapper setupInjectablesInObjectMapper(ObjectMapper objectMapper)
{
final GuiceAnnotationIntrospector guiceIntrospector = new GuiceAnnotationIntrospector();
objectMapper.setAnnotationIntrospectors(
new AnnotationIntrospectorPair(
guiceIntrospector, objectMapper.getSerializationConfig().getAnnotationIntrospector()
),
new AnnotationIntrospectorPair(
guiceIntrospector, objectMapper.getDeserializationConfig().getAnnotationIntrospector()
)
);
objectMapper.setInjectableValues(new GuiceInjectableValues(INJECTOR));
objectMapper.registerModule(
new SimpleModule().registerSubtypes(new NamedType(NumberedShardSpec.class, "NumberedShardSpec"))
);
return objectMapper;
}
private static List<String> findDimensions(int startIndex, Interval segmentInterval)
{
final List<String> dimensions = new ArrayList<>();
dimensions.add(TIMESTAMP_COLUMN);
for (int i = 0; i < 5; i++) {
int postfix = i + startIndex;
postfix = postfix >= 5 ? postfix - 5 : postfix;
dimensions.add("string_dim_" + postfix);
dimensions.add("long_dim_" + postfix);
dimensions.add("float_dim_" + postfix);
dimensions.add("double_dim_" + postfix);
}
dimensions.add(MIXED_TYPE_COLUMN_MAP.get(segmentInterval).getName());
return dimensions;
}
private static IndexTuningConfig createTuningConfig()
{
return new IndexTuningConfig(
5000000,
500000,
1000000,
null,
null,
new IndexSpec(
new RoaringBitmapSerdeFactory(true),
CompressionStrategy.LZ4,
CompressionStrategy.LZF,
LongEncodingStrategy.LONGS
),
5000,
true,
false,
true,
false,
100L
);
}
@Rule
public ExpectedException expectedException = ExpectedException.none();
@Test
public void testSerdeWithInterval() throws IOException
{
final CompactionTask task = new CompactionTask(
null,
null,
DATA_SOURCE,
COMPACTION_INTERVAL,
null,
null,
createTuningConfig(),
ImmutableMap.of("testKey", "testContext"),
INJECTOR,
objectMapper
);
final byte[] bytes = objectMapper.writeValueAsBytes(task);
final CompactionTask fromJson = objectMapper.readValue(bytes, CompactionTask.class);
Assert.assertEquals(task.getType(), fromJson.getType());
Assert.assertEquals(task.getDataSource(), fromJson.getDataSource());
Assert.assertEquals(task.getInterval(), fromJson.getInterval());
Assert.assertEquals(task.getSegments(), fromJson.getSegments());
Assert.assertEquals(task.getDimensionsSpec(), fromJson.getDimensionsSpec());
Assert.assertEquals(task.getTuningConfig(), fromJson.getTuningConfig());
Assert.assertEquals(task.getContext(), fromJson.getContext());
Assert.assertNull(fromJson.getSegmentProvider().getSegments());
}
@Test
public void testSerdeWithSegments() throws IOException
{
final CompactionTask task = new CompactionTask(
null,
null,
DATA_SOURCE,
null,
SEGMENTS,
null,
createTuningConfig(),
ImmutableMap.of("testKey", "testContext"),
INJECTOR,
objectMapper
);
final byte[] bytes = objectMapper.writeValueAsBytes(task);
final CompactionTask fromJson = objectMapper.readValue(bytes, CompactionTask.class);
Assert.assertEquals(task.getType(), fromJson.getType());
Assert.assertEquals(task.getDataSource(), fromJson.getDataSource());
Assert.assertEquals(task.getInterval(), fromJson.getInterval());
Assert.assertEquals(task.getSegments(), fromJson.getSegments());
Assert.assertEquals(task.getDimensionsSpec(), fromJson.getDimensionsSpec());
Assert.assertEquals(task.getTuningConfig(), fromJson.getTuningConfig());
Assert.assertEquals(task.getContext(), fromJson.getContext());
}
@Test
public void testCreateIngestionSchema() throws IOException, SegmentLoadingException
{
final IndexIngestionSpec ingestionSchema = CompactionTask.createIngestionSchema(
toolbox,
new SegmentProvider(DATA_SOURCE, COMPACTION_INTERVAL),
null,
TUNING_CONFIG,
INJECTOR,
objectMapper
);
final DimensionsSpec expectedDimensionsSpec = getExpectedDimensionsSpecForAutoGeneration();
assertIngestionSchema(ingestionSchema, expectedDimensionsSpec);
}
@Test
public void testCreateIngestionSchemaWithCustomDimensionsSpec() throws IOException, SegmentLoadingException
{
final DimensionsSpec customSpec = new DimensionsSpec(
Lists.newArrayList(
new LongDimensionSchema("timestamp"),
new StringDimensionSchema("string_dim_0"),
new StringDimensionSchema("string_dim_1"),
new StringDimensionSchema("string_dim_2"),
new StringDimensionSchema("string_dim_3"),
new StringDimensionSchema("string_dim_4"),
new LongDimensionSchema("long_dim_0"),
new LongDimensionSchema("long_dim_1"),
new LongDimensionSchema("long_dim_2"),
new LongDimensionSchema("long_dim_3"),
new LongDimensionSchema("long_dim_4"),
new FloatDimensionSchema("float_dim_0"),
new FloatDimensionSchema("float_dim_1"),
new FloatDimensionSchema("float_dim_2"),
new FloatDimensionSchema("float_dim_3"),
new FloatDimensionSchema("float_dim_4"),
new DoubleDimensionSchema("double_dim_0"),
new DoubleDimensionSchema("double_dim_1"),
new DoubleDimensionSchema("double_dim_2"),
new DoubleDimensionSchema("double_dim_3"),
new DoubleDimensionSchema("double_dim_4"),
new StringDimensionSchema(MIXED_TYPE_COLUMN)
),
null,
null
);
final IndexIngestionSpec ingestionSchema = CompactionTask.createIngestionSchema(
toolbox,
new SegmentProvider(DATA_SOURCE, COMPACTION_INTERVAL),
customSpec,
TUNING_CONFIG,
INJECTOR,
objectMapper
);
assertIngestionSchema(ingestionSchema, customSpec);
}
@Test
public void testCreateIngestionSchemaWithCustomSegments() throws IOException, SegmentLoadingException
{
final IndexIngestionSpec ingestionSchema = CompactionTask.createIngestionSchema(
toolbox,
new SegmentProvider(SEGMENTS),
null,
TUNING_CONFIG,
INJECTOR,
objectMapper
);
final DimensionsSpec expectedDimensionsSpec = getExpectedDimensionsSpecForAutoGeneration();
assertIngestionSchema(ingestionSchema, expectedDimensionsSpec);
}
@Test
public void testCreateIngestionSchemaWithDifferentSegmentSet() throws IOException, SegmentLoadingException
{
expectedException.expect(CoreMatchers.instanceOf(IllegalStateException.class));
expectedException.expectMessage(CoreMatchers.containsString("are different from the currently used segments"));
final List<DataSegment> segments = new ArrayList<>(SEGMENTS);
segments.remove(0);
CompactionTask.createIngestionSchema(
toolbox,
new SegmentProvider(segments),
null,
TUNING_CONFIG,
INJECTOR,
objectMapper
);
}
private static DimensionsSpec getExpectedDimensionsSpecForAutoGeneration()
{
return new DimensionsSpec(
Lists.newArrayList(
new LongDimensionSchema("timestamp"),
new StringDimensionSchema("string_dim_4"),
new LongDimensionSchema("long_dim_4"),
new FloatDimensionSchema("float_dim_4"),
new DoubleDimensionSchema("double_dim_4"),
new StringDimensionSchema("string_dim_0"),
new LongDimensionSchema("long_dim_0"),
new FloatDimensionSchema("float_dim_0"),
new DoubleDimensionSchema("double_dim_0"),
new StringDimensionSchema("string_dim_1"),
new LongDimensionSchema("long_dim_1"),
new FloatDimensionSchema("float_dim_1"),
new DoubleDimensionSchema("double_dim_1"),
new StringDimensionSchema("string_dim_2"),
new LongDimensionSchema("long_dim_2"),
new FloatDimensionSchema("float_dim_2"),
new DoubleDimensionSchema("double_dim_2"),
new StringDimensionSchema("string_dim_3"),
new LongDimensionSchema("long_dim_3"),
new FloatDimensionSchema("float_dim_3"),
new DoubleDimensionSchema("double_dim_3"),
new DoubleDimensionSchema("string_to_double")
),
null,
null
);
}
private static void assertIngestionSchema(
IndexIngestionSpec ingestionSchema,
DimensionsSpec expectedDimensionsSpec
)
{
// assert dataSchema
final DataSchema dataSchema = ingestionSchema.getDataSchema();
Assert.assertEquals(DATA_SOURCE, dataSchema.getDataSource());
final InputRowParser parser = objectMapper.convertValue(dataSchema.getParser(), InputRowParser.class);
Assert.assertTrue(parser instanceof TransformingInputRowParser);
Assert.assertTrue(((TransformingInputRowParser) parser).getParser() instanceof NoopInputRowParser);
Assert.assertTrue(parser.getParseSpec() instanceof TimeAndDimsParseSpec);
Assert.assertEquals(
new HashSet<>(expectedDimensionsSpec.getDimensions()),
new HashSet<>(parser.getParseSpec().getDimensionsSpec().getDimensions())
);
final Set<AggregatorFactory> expectedAggregators = AGGREGATORS.values()
.stream()
.map(AggregatorFactory::getCombiningFactory)
.collect(Collectors.toSet());
Assert.assertEquals(expectedAggregators, new HashSet<>(Arrays.asList(dataSchema.getAggregators())));
Assert.assertEquals(
new ArbitraryGranularitySpec(Granularities.NONE, false, ImmutableList.of(COMPACTION_INTERVAL)),
dataSchema.getGranularitySpec()
);
// assert ioConfig
final IndexIOConfig ioConfig = ingestionSchema.getIOConfig();
Assert.assertFalse(ioConfig.isAppendToExisting());
final FirehoseFactory firehoseFactory = ioConfig.getFirehoseFactory();
Assert.assertTrue(firehoseFactory instanceof IngestSegmentFirehoseFactory);
final IngestSegmentFirehoseFactory ingestSegmentFirehoseFactory = (IngestSegmentFirehoseFactory) firehoseFactory;
Assert.assertEquals(DATA_SOURCE, ingestSegmentFirehoseFactory.getDataSource());
Assert.assertEquals(COMPACTION_INTERVAL, ingestSegmentFirehoseFactory.getInterval());
Assert.assertNull(ingestSegmentFirehoseFactory.getDimensionsFilter());
// check the order of dimensions
Assert.assertEquals(expectedDimensionsSpec.getDimensionNames(), ingestSegmentFirehoseFactory.getDimensions());
// check the order of metrics
Assert.assertEquals(
Lists.newArrayList("agg_4", "agg_3", "agg_2", "agg_1", "agg_0"),
ingestSegmentFirehoseFactory.getMetrics()
);
// assert tuningConfig
Assert.assertEquals(createTuningConfig(), ingestionSchema.getTuningConfig());
}
private static class TestTaskToolbox extends TaskToolbox
{
private final Map<DataSegment, File> segmentFileMap;
TestTaskToolbox(
TaskActionClient taskActionClient,
IndexIO indexIO,
Map<DataSegment, File> segmentFileMap
)
{
super(
null,
taskActionClient,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
indexIO,
null,
null,
new IndexMergerV9(objectMapper, indexIO),
null,
null,
null,
null
);
this.segmentFileMap = segmentFileMap;
}
@Override
public Map<DataSegment, File> fetchSegments(List<DataSegment> segments)
throws SegmentLoadingException
{
final Map<DataSegment, File> submap = new HashMap<>(segments.size());
for (DataSegment segment : segments) {
final File file = Preconditions.checkNotNull(segmentFileMap.get(segment));
submap.put(segment, file);
}
return submap;
}
}
private static class TestTaskActionClient implements TaskActionClient
{
private final List<DataSegment> segments;
TestTaskActionClient(List<DataSegment> segments)
{
this.segments = segments;
}
@Override
public <RetType> RetType submit(TaskAction<RetType> taskAction) throws IOException
{
if (!(taskAction instanceof SegmentListUsedAction)) {
throw new ISE("action[%s] is not supported", taskAction);
}
return (RetType) segments;
}
}
private static class TestIndexIO extends IndexIO
{
private final Map<File, QueryableIndex> queryableIndexMap;
TestIndexIO(
ObjectMapper mapper,
Map<DataSegment, File> segmentFileMap
)
{
super(mapper, () -> 0);
queryableIndexMap = new HashMap<>(segmentFileMap.size());
for (Entry<DataSegment, File> entry : segmentFileMap.entrySet()) {
final DataSegment segment = entry.getKey();
final List<String> columnNames = new ArrayList<>(segment.getDimensions().size() + segment.getMetrics().size());
columnNames.add(Column.TIME_COLUMN_NAME);
columnNames.addAll(segment.getDimensions());
columnNames.addAll(segment.getMetrics());
final Map<String, Column> columnMap = new HashMap<>(columnNames.size());
final List<AggregatorFactory> aggregatorFactories = new ArrayList<>(segment.getMetrics().size());
for (String columnName : columnNames) {
if (columnName.equals(MIXED_TYPE_COLUMN)) {
columnMap.put(columnName, createColumn(MIXED_TYPE_COLUMN_MAP.get(segment.getInterval())));
} else if (DIMENSIONS.containsKey(columnName)) {
columnMap.put(columnName, createColumn(DIMENSIONS.get(columnName)));
} else if (AGGREGATORS.containsKey(columnName)) {
columnMap.put(columnName, createColumn(AGGREGATORS.get(columnName)));
aggregatorFactories.add(AGGREGATORS.get(columnName));
}
}
final Metadata metadata = new Metadata();
metadata.setAggregators(aggregatorFactories.toArray(new AggregatorFactory[aggregatorFactories.size()]));
metadata.setRollup(false);
queryableIndexMap.put(
entry.getValue(),
new SimpleQueryableIndex(
segment.getInterval(),
new ListIndexed<>(columnNames, String.class),
new ListIndexed<>(segment.getDimensions(), String.class),
null,
columnMap,
null,
metadata
)
);
}
}
@Override
public QueryableIndex loadIndex(File file) throws IOException
{
return queryableIndexMap.get(file);
}
}
private static Column createColumn(DimensionSchema dimensionSchema)
{
return new ColumnBuilder()
.setType(IncrementalIndex.TYPE_MAP.get(dimensionSchema.getValueType()))
.setDictionaryEncodedColumn(() -> null)
.setBitmapIndex(() -> null)
.build();
}
private static Column createColumn(AggregatorFactory aggregatorFactory)
{
return new ColumnBuilder()
.setType(ValueType.fromString(aggregatorFactory.getTypeName()))
.build();
}
}

View File

@ -87,7 +87,11 @@ import io.druid.segment.realtime.firehose.IngestSegmentFirehose;
import io.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory;
import io.druid.server.metrics.NoopServiceEmitter;
import io.druid.timeline.DataSegment;
import io.druid.timeline.TimelineObjectHolder;
import io.druid.timeline.partition.NumberedPartitionChunk;
import io.druid.timeline.partition.NumberedShardSpec;
import io.druid.timeline.partition.PartitionChunk;
import io.druid.timeline.partition.PartitionHolder;
import org.easymock.EasyMock;
import org.joda.time.Interval;
import org.junit.AfterClass;
@ -108,6 +112,8 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
/**
*
@ -575,6 +581,96 @@ public class IngestSegmentFirehoseFactoryTest
Assert.assertEquals((int) MAX_ROWS, (int) rowcount);
}
@Test
public void testGetUniqueDimensionsAndMetrics()
{
final int numSegmentsPerPartitionChunk = 5;
final int numPartitionChunksPerTimelineObject = 10;
final int numSegments = numSegmentsPerPartitionChunk * numPartitionChunksPerTimelineObject;
final List<DataSegment> segments = new ArrayList<>(numSegments);
final Interval interval = Intervals.of("2017-01-01/2017-01-02");
final String version = "1";
final List<TimelineObjectHolder<String, DataSegment>> timelineSegments = new ArrayList<>();
for (int i = 0; i < numPartitionChunksPerTimelineObject; i++) {
final List<PartitionChunk<DataSegment>> chunks = new ArrayList<>();
for (int j = 0; j < numSegmentsPerPartitionChunk; j++) {
final List<String> dims = IntStream.range(i, i + numSegmentsPerPartitionChunk)
.mapToObj(suffix -> "dim" + suffix)
.collect(Collectors.toList());
final List<String> metrics = IntStream.range(i, i + numSegmentsPerPartitionChunk)
.mapToObj(suffix -> "met" + suffix)
.collect(Collectors.toList());
final DataSegment segment = new DataSegment(
"ds",
interval,
version,
ImmutableMap.of(),
dims,
metrics,
new NumberedShardSpec(numPartitionChunksPerTimelineObject, i),
1,
1
);
segments.add(segment);
final PartitionChunk<DataSegment> partitionChunk = new NumberedPartitionChunk<>(
i,
numPartitionChunksPerTimelineObject,
segment
);
chunks.add(partitionChunk);
}
final TimelineObjectHolder<String, DataSegment> timelineHolder = new TimelineObjectHolder<>(
interval,
version,
new PartitionHolder<>(chunks)
);
timelineSegments.add(timelineHolder);
}
final String[] expectedDims = new String[]{
"dim9",
"dim10",
"dim11",
"dim12",
"dim13",
"dim8",
"dim7",
"dim6",
"dim5",
"dim4",
"dim3",
"dim2",
"dim1",
"dim0"
};
final String[] expectedMetrics = new String[]{
"met9",
"met10",
"met11",
"met12",
"met13",
"met8",
"met7",
"met6",
"met5",
"met4",
"met3",
"met2",
"met1",
"met0"
};
Assert.assertEquals(
Arrays.asList(expectedDims),
IngestSegmentFirehoseFactory.getUniqueDimensions(timelineSegments, null)
);
Assert.assertEquals(
Arrays.asList(expectedMetrics),
IngestSegmentFirehoseFactory.getUniqueMetrics(timelineSegments)
);
}
private static ServiceEmitter newMockEmitter()
{
return new NoopServiceEmitter();

View File

@ -39,6 +39,7 @@ import org.joda.time.Interval;
import java.net.URL;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
public class CoordinatorResourceTestClient
@ -80,7 +81,7 @@ public class CoordinatorResourceTestClient
}
// return a list of the segment dates for the specified datasource
public ArrayList<String> getSegmentIntervals(final String dataSource) throws Exception
public List<String> getSegmentIntervals(final String dataSource) throws Exception
{
ArrayList<String> segments = null;
try {

View File

@ -33,8 +33,8 @@ import org.joda.time.Interval;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Callable;
public abstract class AbstractIndexerTest
@ -55,7 +55,7 @@ public abstract class AbstractIndexerTest
protected void unloadAndKillData(final String dataSource) throws Exception
{
ArrayList<String> intervals = coordinator.getSegmentIntervals(dataSource);
List<String> intervals = coordinator.getSegmentIntervals(dataSource);
// each element in intervals has this form:
// 2015-12-01T23:15:00.000Z/2015-12-01T23:16:00.000Z

View File

@ -0,0 +1,87 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.tests.indexer;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.logger.Logger;
import io.druid.testing.guice.DruidTestModuleFactory;
import io.druid.testing.utils.RetryUtil;
import org.testng.annotations.Guice;
import org.testng.annotations.Test;
import java.util.List;
@Guice(moduleFactory = DruidTestModuleFactory.class)
public class ITCompactionTaskTest extends AbstractIndexerTest
{
private static final Logger LOG = new Logger(ITCompactionTaskTest.class);
private static String INDEX_TASK = "/indexer/wikipedia_index_task.json";
private static String INDEX_QUERIES_RESOURCE = "/indexer/wikipedia_index_queries.json";
private static String INDEX_DATASOURCE = "wikipedia_index_test";
private static String COMPACTION_TASK = "/indexer/wikipedia_compaction_task.json";
private static String COMPACTED_INTERVAL = "2013-08-31T00:00:00.000Z/2013-09-02T00:00:00.000Z";
@Test
public void testCompaction() throws Exception
{
loadData();
final List<String> intervalsBeforeCompaction = coordinator.getSegmentIntervals(INDEX_DATASOURCE);
if (intervalsBeforeCompaction.contains(COMPACTED_INTERVAL)) {
throw new ISE("Containing a segment for the compacted interval[%s] before compaction", COMPACTED_INTERVAL);
}
try {
queryHelper.testQueriesFromFile(INDEX_QUERIES_RESOURCE, 2);
compactData();
queryHelper.testQueriesFromFile(INDEX_QUERIES_RESOURCE, 2);
final List<String> intervalsAfterCompaction = coordinator.getSegmentIntervals(INDEX_DATASOURCE);
if (!intervalsAfterCompaction.contains(COMPACTED_INTERVAL)) {
throw new ISE("Compacted segment for interval[%s] does not exist", COMPACTED_INTERVAL);
}
}
finally {
unloadAndKillData(INDEX_DATASOURCE);
}
}
private void loadData() throws Exception
{
final String taskID = indexer.submitTask(getTaskAsString(INDEX_TASK));
LOG.info("TaskID for loading index task %s", taskID);
indexer.waitUntilTaskCompletes(taskID);
RetryUtil.retryUntilTrue(
() -> coordinator.areSegmentsLoaded(INDEX_DATASOURCE),
"Segment Load"
);
}
private void compactData() throws Exception
{
final String taskID = indexer.submitTask(getTaskAsString(COMPACTION_TASK));
LOG.info("TaskID for compaction task %s", taskID);
indexer.waitUntilTaskCompletes(taskID);
RetryUtil.retryUntilTrue(
() -> coordinator.areSegmentsLoaded(INDEX_DATASOURCE),
"Segment Compaction"
);
}
}

View File

@ -558,7 +558,7 @@
"timestamp": "2013-08-31T01:02:33.000Z",
"result": {
"minTime": "2013-08-31T01:02:33.000Z",
"maxTime": "2013-08-31T12:41:27.000Z"
"maxTime": "2013-09-01T12:41:27.000Z"
}
}
]

View File

@ -0,0 +1,5 @@
{
"type" : "compact",
"dataSource" : "wikipedia_index_test",
"interval" : "2013-08-31/2013-09-02"
}

View File

@ -2,4 +2,9 @@
{"timestamp": "2013-08-31T03:32:45Z", "page": "Striker Eureka", "language" : "en", "user" : "speed", "unpatrolled" : "false", "newPage" : "true", "robot": "true", "anonymous": "false", "namespace":"wikipedia", "continent":"Australia", "country":"Australia", "region":"Cantebury", "city":"Syndey", "added": 459, "deleted": 129, "delta": 330}
{"timestamp": "2013-08-31T07:11:21Z", "page": "Cherno Alpha", "language" : "ru", "user" : "masterYi", "unpatrolled" : "false", "newPage" : "true", "robot": "true", "anonymous": "false", "namespace":"article", "continent":"Asia", "country":"Russia", "region":"Oblast", "city":"Moscow", "added": 123, "deleted": 12, "delta": 111}
{"timestamp": "2013-08-31T11:58:39Z", "page": "Crimson Typhoon", "language" : "zh", "user" : "triplets", "unpatrolled" : "true", "newPage" : "false", "robot": "true", "anonymous": "false", "namespace":"wikipedia", "continent":"Asia", "country":"China", "region":"Shanxi", "city":"Taiyuan", "added": 905, "deleted": 5, "delta": 900}
{"timestamp": "2013-08-31T12:41:27Z", "page": "Coyote Tango", "language" : "ja", "user" : "stringer", "unpatrolled" : "true", "newPage" : "false", "robot": "true", "anonymous": "false", "namespace":"wikipedia", "continent":"Asia", "country":"Japan", "region":"Kanto", "city":"Tokyo", "added": 1, "deleted": 10, "delta": -9}
{"timestamp": "2013-08-31T12:41:27Z", "page": "Coyote Tango", "language" : "ja", "user" : "stringer", "unpatrolled" : "true", "newPage" : "false", "robot": "true", "anonymous": "false", "namespace":"wikipedia", "continent":"Asia", "country":"Japan", "region":"Kanto", "city":"Tokyo", "added": 1, "deleted": 10, "delta": -9}
{"timestamp": "2013-09-01T01:02:33Z", "page": "Gypsy Danger", "language" : "en", "user" : "nuclear", "unpatrolled" : "true", "newPage" : "true", "robot": "false", "anonymous": "false", "namespace":"article", "continent":"North America", "country":"United States", "region":"Bay Area", "city":"San Francisco", "added": 57, "deleted": 200, "delta": -143}
{"timestamp": "2013-09-01T03:32:45Z", "page": "Striker Eureka", "language" : "en", "user" : "speed", "unpatrolled" : "false", "newPage" : "true", "robot": "true", "anonymous": "false", "namespace":"wikipedia", "continent":"Australia", "country":"Australia", "region":"Cantebury", "city":"Syndey", "added": 459, "deleted": 129, "delta": 330}
{"timestamp": "2013-09-01T07:11:21Z", "page": "Cherno Alpha", "language" : "ru", "user" : "masterYi", "unpatrolled" : "false", "newPage" : "true", "robot": "true", "anonymous": "false", "namespace":"article", "continent":"Asia", "country":"Russia", "region":"Oblast", "city":"Moscow", "added": 123, "deleted": 12, "delta": 111}
{"timestamp": "2013-09-01T11:58:39Z", "page": "Crimson Typhoon", "language" : "zh", "user" : "triplets", "unpatrolled" : "true", "newPage" : "false", "robot": "true", "anonymous": "false", "namespace":"wikipedia", "continent":"Asia", "country":"China", "region":"Shanxi", "city":"Taiyuan", "added": 905, "deleted": 5, "delta": 900}
{"timestamp": "2013-09-01T12:41:27Z", "page": "Coyote Tango", "language" : "ja", "user" : "stringer", "unpatrolled" : "true", "newPage" : "false", "robot": "true", "anonymous": "false", "namespace":"wikipedia", "continent":"Asia", "country":"Japan", "region":"Kanto", "city":"Tokyo", "added": 1, "deleted": 10, "delta": -9}

View File

@ -5,13 +5,15 @@
"queryType" : "timeBoundary",
"dataSource": "wikipedia_index_test"
},
"expectedResults":[ {
"timestamp" : "2013-08-31T01:02:33.000Z",
"result" : {
"minTime" : "2013-08-31T01:02:33.000Z",
"maxTime" : "2013-08-31T12:41:27.000Z"
}
} ]
"expectedResults":[
{
"timestamp" : "2013-08-31T01:02:33.000Z",
"result" : {
"minTime" : "2013-08-31T01:02:33.000Z",
"maxTime" : "2013-09-01T12:41:27.000Z"
}
}
]
},
{

View File

@ -27,7 +27,7 @@
"granularitySpec": {
"segmentGranularity": "DAY",
"queryGranularity": "second",
"intervals" : [ "2013-08-31/2013-09-01" ]
"intervals" : [ "2013-08-31/2013-09-02" ]
},
"parser": {
"parseSpec": {

View File

@ -28,7 +28,7 @@ import org.joda.time.Interval;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.TreeSet;
import java.util.SortedSet;
/**
*/
@ -42,9 +42,15 @@ public class JodaUtils
{
ArrayList<Interval> retVal = Lists.newArrayList();
TreeSet<Interval> sortedIntervals = Sets.newTreeSet(Comparators.intervalsByStartThenEnd());
for (Interval interval : intervals) {
sortedIntervals.add(interval);
final SortedSet<Interval> sortedIntervals;
if (intervals instanceof SortedSet) {
sortedIntervals = (SortedSet<Interval>) intervals;
} else {
sortedIntervals = Sets.newTreeSet(Comparators.intervalsByStartThenEnd());
for (Interval interval : intervals) {
sortedIntervals.add(interval);
}
}
if (sortedIntervals.isEmpty()) {

View File

@ -26,6 +26,7 @@ import io.druid.java.util.common.logger.Logger;
import io.druid.segment.ColumnSelectorFactory;
import javax.annotation.Nullable;
import java.util.Arrays;
import java.util.Comparator;
import java.util.LinkedHashMap;
import java.util.List;
@ -92,7 +93,12 @@ public abstract class AggregatorFactory implements Cacheable
*/
public AggregatorFactory getMergingFactory(AggregatorFactory other) throws AggregatorFactoryNotMergeableException
{
throw new UOE("[%s] does not implement getMergingFactory(..)", this.getClass().getName());
final AggregatorFactory combiningFactory = this.getCombiningFactory();
if (other.getName().equals(this.getName()) && combiningFactory.equals(other.getCombiningFactory())) {
return combiningFactory;
} else {
throw new AggregatorFactoryNotMergeableException(this, other);
}
}
/**
@ -153,6 +159,17 @@ public abstract class AggregatorFactory implements Cacheable
return null;
}
if (aggregatorsList.size() == 1) {
final AggregatorFactory[] aggregatorFactories = aggregatorsList.get(0);
if (aggregatorFactories != null) {
final AggregatorFactory[] combiningFactories = new AggregatorFactory[aggregatorFactories.length];
Arrays.setAll(combiningFactories, i -> aggregatorFactories[i].getCombiningFactory());
return combiningFactories;
} else {
return null;
}
}
Map<String, AggregatorFactory> mergedAggregators = new LinkedHashMap<>();
for (AggregatorFactory[] aggregators : aggregatorsList) {
@ -163,7 +180,9 @@ public abstract class AggregatorFactory implements Cacheable
if (mergedAggregators.containsKey(name)) {
AggregatorFactory other = mergedAggregators.get(name);
try {
mergedAggregators.put(name, other.getMergingFactory(aggregator));
// the order of aggregator matters when calling getMergingFactory()
// because it returns a combiningAggregator which can be different from the original aggregator.
mergedAggregators.put(name, aggregator.getMergingFactory(other));
}
catch (AggregatorFactoryNotMergeableException ex) {
log.warn(ex, "failed to merge aggregator factories");

View File

@ -29,7 +29,6 @@ import io.druid.segment.ColumnSelectorFactory;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
/**
*/
@ -118,29 +117,4 @@ public class DoubleMaxAggregatorFactory extends SimpleDoubleAggregatorFactory
", name='" + name + '\'' +
'}';
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
DoubleMaxAggregatorFactory that = (DoubleMaxAggregatorFactory) o;
if (!Objects.equals(fieldName, that.fieldName)) {
return false;
}
if (!Objects.equals(expression, that.expression)) {
return false;
}
if (!Objects.equals(name, that.name)) {
return false;
}
return true;
}
}

View File

@ -29,7 +29,6 @@ import io.druid.segment.ColumnSelectorFactory;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
/**
*/
@ -116,29 +115,4 @@ public class DoubleMinAggregatorFactory extends SimpleDoubleAggregatorFactory
", name='" + name + '\'' +
'}';
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
DoubleMinAggregatorFactory that = (DoubleMinAggregatorFactory) o;
if (!Objects.equals(fieldName, that.fieldName)) {
return false;
}
if (!Objects.equals(expression, that.expression)) {
return false;
}
if (!Objects.equals(name, that.name)) {
return false;
}
return true;
}
}

View File

@ -29,7 +29,6 @@ import io.druid.segment.ColumnSelectorFactory;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
/**
*/
@ -111,30 +110,4 @@ public class DoubleSumAggregatorFactory extends SimpleDoubleAggregatorFactory
", name='" + name + '\'' +
'}';
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
DoubleSumAggregatorFactory that = (DoubleSumAggregatorFactory) o;
if (!Objects.equals(fieldName, that.fieldName)) {
return false;
}
if (!Objects.equals(expression, that.expression)) {
return false;
}
if (!Objects.equals(name, that.name)) {
return false;
}
return true;
}
}

View File

@ -102,6 +102,31 @@ public abstract class SimpleDoubleAggregatorFactory extends AggregatorFactory
return Objects.hash(fieldName, expression, name);
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
SimpleDoubleAggregatorFactory that = (SimpleDoubleAggregatorFactory) o;
if (!Objects.equals(fieldName, that.fieldName)) {
return false;
}
if (!Objects.equals(expression, that.expression)) {
return false;
}
if (!Objects.equals(name, that.name)) {
return false;
}
return true;
}
@Override
public Comparator getComparator()
{

View File

@ -31,7 +31,6 @@ import io.druid.query.ColumnSelectorPlus;
import io.druid.query.aggregation.AggregateCombiner;
import io.druid.query.aggregation.Aggregator;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.AggregatorFactoryNotMergeableException;
import io.druid.query.aggregation.AggregatorUtil;
import io.druid.query.aggregation.BufferAggregator;
import io.druid.query.aggregation.NoopAggregator;
@ -200,12 +199,6 @@ public class CardinalityAggregatorFactory extends AggregatorFactory
return new HyperUniquesAggregatorFactory(name, name, false, round);
}
@Override
public AggregatorFactory getMergingFactory(AggregatorFactory other) throws AggregatorFactoryNotMergeableException
{
throw new UnsupportedOperationException("can't merge CardinalityAggregatorFactory");
}
@Override
public List<AggregatorFactory> getRequiredColumns()
{

View File

@ -30,7 +30,6 @@ import io.druid.java.util.common.UOE;
import io.druid.query.aggregation.AggregateCombiner;
import io.druid.query.aggregation.Aggregator;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.AggregatorFactoryNotMergeableException;
import io.druid.query.aggregation.AggregatorUtil;
import io.druid.query.aggregation.BufferAggregator;
import io.druid.query.monomorphicprocessing.RuntimeShapeInspector;
@ -162,16 +161,6 @@ public class DoubleFirstAggregatorFactory extends AggregatorFactory
};
}
@Override
public AggregatorFactory getMergingFactory(AggregatorFactory other) throws AggregatorFactoryNotMergeableException
{
if (other.getName().equals(this.getName()) && this.getClass() == other.getClass()) {
return getCombiningFactory();
} else {
throw new AggregatorFactoryNotMergeableException(this, other);
}
}
@Override
public List<AggregatorFactory> getRequiredColumns()
{

View File

@ -30,7 +30,6 @@ import io.druid.java.util.common.UOE;
import io.druid.query.aggregation.AggregateCombiner;
import io.druid.query.aggregation.Aggregator;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.AggregatorFactoryNotMergeableException;
import io.druid.query.aggregation.AggregatorUtil;
import io.druid.query.aggregation.BufferAggregator;
import io.druid.query.monomorphicprocessing.RuntimeShapeInspector;
@ -160,16 +159,6 @@ public class FloatFirstAggregatorFactory extends AggregatorFactory
};
}
@Override
public AggregatorFactory getMergingFactory(AggregatorFactory other) throws AggregatorFactoryNotMergeableException
{
if (other.getName().equals(this.getName()) && this.getClass() == other.getClass()) {
return getCombiningFactory();
} else {
throw new AggregatorFactoryNotMergeableException(this, other);
}
}
@Override
public List<AggregatorFactory> getRequiredColumns()
{

View File

@ -29,7 +29,6 @@ import io.druid.java.util.common.UOE;
import io.druid.query.aggregation.AggregateCombiner;
import io.druid.query.aggregation.Aggregator;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.AggregatorFactoryNotMergeableException;
import io.druid.query.aggregation.AggregatorUtil;
import io.druid.query.aggregation.BufferAggregator;
import io.druid.query.monomorphicprocessing.RuntimeShapeInspector;
@ -153,16 +152,6 @@ public class LongFirstAggregatorFactory extends AggregatorFactory
};
}
@Override
public AggregatorFactory getMergingFactory(AggregatorFactory other) throws AggregatorFactoryNotMergeableException
{
if (other.getName().equals(this.getName()) && this.getClass() == other.getClass()) {
return getCombiningFactory();
} else {
throw new AggregatorFactoryNotMergeableException(this, other);
}
}
@Override
public List<AggregatorFactory> getRequiredColumns()
{

View File

@ -29,7 +29,6 @@ import io.druid.java.util.common.UOE;
import io.druid.query.aggregation.AggregateCombiner;
import io.druid.query.aggregation.Aggregator;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.AggregatorFactoryNotMergeableException;
import io.druid.query.aggregation.AggregatorUtil;
import io.druid.query.aggregation.BufferAggregator;
import io.druid.query.aggregation.first.DoubleFirstAggregatorFactory;
@ -153,16 +152,6 @@ public class DoubleLastAggregatorFactory extends AggregatorFactory
};
}
@Override
public AggregatorFactory getMergingFactory(AggregatorFactory other) throws AggregatorFactoryNotMergeableException
{
if (other.getName().equals(this.getName()) && this.getClass() == other.getClass()) {
return getCombiningFactory();
} else {
throw new AggregatorFactoryNotMergeableException(this, other);
}
}
@Override
public List<AggregatorFactory> getRequiredColumns()
{

View File

@ -29,7 +29,6 @@ import io.druid.java.util.common.UOE;
import io.druid.query.aggregation.AggregateCombiner;
import io.druid.query.aggregation.Aggregator;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.AggregatorFactoryNotMergeableException;
import io.druid.query.aggregation.AggregatorUtil;
import io.druid.query.aggregation.BufferAggregator;
import io.druid.query.aggregation.first.FloatFirstAggregatorFactory;
@ -151,16 +150,6 @@ public class FloatLastAggregatorFactory extends AggregatorFactory
};
}
@Override
public AggregatorFactory getMergingFactory(AggregatorFactory other) throws AggregatorFactoryNotMergeableException
{
if (other.getName().equals(this.getName()) && this.getClass() == other.getClass()) {
return getCombiningFactory();
} else {
throw new AggregatorFactoryNotMergeableException(this, other);
}
}
@Override
public List<AggregatorFactory> getRequiredColumns()
{

View File

@ -28,7 +28,6 @@ import io.druid.java.util.common.UOE;
import io.druid.query.aggregation.AggregateCombiner;
import io.druid.query.aggregation.Aggregator;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.AggregatorFactoryNotMergeableException;
import io.druid.query.aggregation.AggregatorUtil;
import io.druid.query.aggregation.BufferAggregator;
import io.druid.query.aggregation.first.DoubleFirstAggregatorFactory;
@ -149,16 +148,6 @@ public class LongLastAggregatorFactory extends AggregatorFactory
};
}
@Override
public AggregatorFactory getMergingFactory(AggregatorFactory other) throws AggregatorFactoryNotMergeableException
{
if (other.getName().equals(this.getName()) && this.getClass() == other.getClass()) {
return getCombiningFactory();
} else {
throw new AggregatorFactoryNotMergeableException(this, other);
}
}
@Override
public List<AggregatorFactory> getRequiredColumns()
{

View File

@ -19,6 +19,7 @@
package io.druid.segment;
import io.druid.data.input.impl.DimensionSchema.MultiValueHandling;
import io.druid.segment.column.Column;
import io.druid.segment.column.ColumnCapabilities;
import io.druid.segment.data.IOPeon;
@ -69,6 +70,14 @@ public interface DimensionHandler
*/
String getDimensionName();
/**
* Get {@link MultiValueHandling} for the column associated with this handler.
* Only string columns can have {@link MultiValueHandling} currently.
*/
default MultiValueHandling getMultivalueHandling()
{
return null;
}
/**
* Creates a new DimensionIndexer, a per-dimension object responsible for processing ingested rows in-memory, used

View File

@ -50,6 +50,12 @@ public class StringDimensionHandler implements DimensionHandler<Integer, int[],
return dimensionName;
}
@Override
public MultiValueHandling getMultivalueHandling()
{
return multiValueHandling;
}
@Override
public int getLengthOfEncodedKeyComponent(int[] dimVals)
{

View File

@ -100,7 +100,7 @@ public abstract class IncrementalIndex<AggregatorType> implements Iterable<Row>,
// Used to discover ValueType based on the class of values in a row
// Also used to convert between the duplicate ValueType enums in DimensionSchema (druid-api) and main druid.
private static final Map<Object, ValueType> TYPE_MAP = ImmutableMap.<Object, ValueType>builder()
public static final Map<Object, ValueType> TYPE_MAP = ImmutableMap.<Object, ValueType>builder()
.put(Long.class, ValueType.LONG)
.put(Double.class, ValueType.DOUBLE)
.put(Float.class, ValueType.FLOAT)

View File

@ -36,6 +36,11 @@ public class TransformingInputRowParser<T> implements InputRowParser<T>
this.transformer = transformSpec.toTransformer();
}
public InputRowParser<T> getParser()
{
return parser;
}
@Override
public InputRow parse(final T row)
{