Make IngestSegmentFirehoseFactory splittable for parallel ingestion (#7048)

* Make IngestSegmentFirehoseFactory splittable for parallel ingestion

* Code review feedback

- Get rid of WindowedSegment
- Don't document 'segments' parameter or support splitting firehoses that use it
- Require 'intervals' in WindowedSegmentId (since it won't be written by hand)

* Add missing @JsonProperty

* Integration test passes

* Add unit test

* Remove two FIXME comments from CompactionTask

I'd like to leave this PR in a potentially mergeable state, but I still would
appreciate reviewer eyes on the questions I'm removing here.

* Updates from code review
This commit is contained in:
David Glasser 2019-04-02 14:59:17 -07:00 committed by Jihoon Son
parent 78fd5aff21
commit 4e23c11345
10 changed files with 496 additions and 46 deletions

View File

@ -87,7 +87,8 @@ The below configurations can be optionally used for tuning the firehose performa
### IngestSegmentFirehose
This Firehose can be used to read the data from existing druid segments.
It can be used ingest existing druid segments using a new schema and change the name, dimensions, metrics, rollup, etc. of the segment.
It can be used to ingest existing druid segments using a new schema and change the name, dimensions, metrics, rollup, etc. of the segment.
This firehose is _splittable_ and can be used by [native parallel index tasks](./native_tasks.html#parallel-index-task).
A sample ingest firehose spec is shown below -
```json
@ -106,6 +107,7 @@ A sample ingest firehose spec is shown below -
|dimensions|The list of dimensions to select. If left empty, no dimensions are returned. If left null or not defined, all dimensions are returned. |no|
|metrics|The list of metrics to select. If left empty, no metrics are returned. If left null or not defined, all metrics are selected.|no|
|filter| See [Filters](../querying/filters.html)|no|
|maxInputSegmentBytesPerTask|When used with the native parallel index task, the maximum number of bytes of input segments to process in a single task. If a single segment is larger than this number, it will be processed by itself in a single task (input segments are never split across tasks). Defaults to 150MB.|no|
#### SqlFirehose

View File

@ -45,7 +45,7 @@ task statuses. If one of them fails, it retries the failed task until the retryi
If all worker tasks succeed, then it collects the reported list of generated segments and publishes those segments at once.
To use this task, the `firehose` in `ioConfig` should be _splittable_. If it's not, this task runs sequentially. The
current splittable fireshoses are [`LocalFirehose`](./firehose.html#localfirehose), [`HttpFirehose`](./firehose.html#httpfirehose)
current splittable fireshoses are [`LocalFirehose`](./firehose.html#localfirehose), [`IngestSegmentFirehose`](./firehose.html#ingestsegmentfirehose), [`HttpFirehose`](./firehose.html#httpfirehose)
, [`StaticS3Firehose`](../development/extensions-core/s3.html#statics3firehose), [`StaticAzureBlobStoreFirehose`](../development/extensions-contrib/azure.html#staticazureblobstorefirehose)
, [`StaticGoogleBlobStoreFirehose`](../development/extensions-contrib/google.html#staticgoogleblobstorefirehose), and [`StaticCloudFilesFirehose`](../development/extensions-contrib/cloudfiles.html#staticcloudfilesfirehose).

View File

@ -506,10 +506,12 @@ public class CompactionTask extends AbstractTask
new IngestSegmentFirehoseFactory(
dataSchema.getDataSource(),
interval,
null,
null, // no filter
// set dimensions and metrics names to make sure that the generated dataSchema is used for the firehose
dataSchema.getParser().getParseSpec().getDimensionsSpec().getDimensionNames(),
Arrays.stream(dataSchema.getAggregators()).map(AggregatorFactory::getName).collect(Collectors.toList()),
null,
toolbox.getIndexIO(),
coordinatorClient,
segmentLoaderFactory,

View File

@ -31,12 +31,16 @@ import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.druid.client.coordinator.CoordinatorClient;
import org.apache.druid.data.input.FiniteFirehoseFactory;
import org.apache.druid.data.input.Firehose;
import org.apache.druid.data.input.FirehoseFactory;
import org.apache.druid.data.input.InputSplit;
import org.apache.druid.data.input.impl.InputRowParser;
import org.apache.druid.indexing.common.RetryPolicy;
import org.apache.druid.indexing.common.RetryPolicyFactory;
import org.apache.druid.indexing.common.SegmentLoaderFactory;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.guava.Comparators;
import org.apache.druid.java.util.common.parsers.ParseException;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.query.filter.DimFilter;
@ -51,40 +55,60 @@ import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.TimelineObjectHolder;
import org.apache.druid.timeline.VersionedIntervalTimeline;
import org.apache.druid.timeline.partition.PartitionChunk;
import org.apache.druid.timeline.partition.PartitionHolder;
import org.joda.time.Duration;
import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
public class IngestSegmentFirehoseFactory implements FirehoseFactory<InputRowParser>
public class IngestSegmentFirehoseFactory implements FiniteFirehoseFactory<InputRowParser, List<WindowedSegmentId>>
{
private static final EmittingLogger log = new EmittingLogger(IngestSegmentFirehoseFactory.class);
private static final long DEFAULT_MAX_INPUT_SEGMENT_BYTES_PER_TASK = 150 * 1024 * 1024;
private final String dataSource;
// Exactly one of interval and segmentIds should be non-null. Typically 'interval' is specified directly
// by the user creating this firehose and 'segmentIds' is used for sub-tasks if it is split for parallel
// batch ingestion.
@Nullable
private final Interval interval;
@Nullable
private final List<WindowedSegmentId> segmentIds;
private final DimFilter dimFilter;
private final List<String> dimensions;
private final List<String> metrics;
private final long maxInputSegmentBytesPerTask;
private final IndexIO indexIO;
private final CoordinatorClient coordinatorClient;
private final SegmentLoaderFactory segmentLoaderFactory;
private final RetryPolicyFactory retryPolicyFactory;
private List<InputSplit<List<WindowedSegmentId>>> splits;
@JsonCreator
public IngestSegmentFirehoseFactory(
@JsonProperty("dataSource") final String dataSource,
@JsonProperty("interval") Interval interval,
@Nullable @JsonProperty("interval") Interval interval,
// Specifying "segments" is intended only for when this FirehoseFactory has split itself,
// not for direct end user use.
@Nullable @JsonProperty("segments") List<WindowedSegmentId> segmentIds,
@JsonProperty("filter") DimFilter dimFilter,
@JsonProperty("dimensions") List<String> dimensions,
@JsonProperty("metrics") List<String> metrics,
@JsonProperty("maxInputSegmentBytesPerTask") Long maxInputSegmentBytesPerTask,
@JacksonInject IndexIO indexIO,
@JacksonInject CoordinatorClient coordinatorClient,
@JacksonInject SegmentLoaderFactory segmentLoaderFactory,
@ -92,18 +116,42 @@ public class IngestSegmentFirehoseFactory implements FirehoseFactory<InputRowPar
)
{
Preconditions.checkNotNull(dataSource, "dataSource");
Preconditions.checkNotNull(interval, "interval");
if ((interval == null && segmentIds == null) || (interval != null && segmentIds != null)) {
throw new IAE("Specify exactly one of 'interval' and 'segments'");
}
this.dataSource = dataSource;
this.interval = interval;
this.segmentIds = segmentIds;
this.dimFilter = dimFilter;
this.dimensions = dimensions;
this.metrics = metrics;
this.maxInputSegmentBytesPerTask = maxInputSegmentBytesPerTask == null
? DEFAULT_MAX_INPUT_SEGMENT_BYTES_PER_TASK
: maxInputSegmentBytesPerTask;
this.indexIO = Preconditions.checkNotNull(indexIO, "null IndexIO");
this.coordinatorClient = Preconditions.checkNotNull(coordinatorClient, "null CoordinatorClient");
this.segmentLoaderFactory = Preconditions.checkNotNull(segmentLoaderFactory, "null SegmentLoaderFactory");
this.retryPolicyFactory = Preconditions.checkNotNull(retryPolicyFactory, "null RetryPolicyFactory");
}
@Override
public FiniteFirehoseFactory<InputRowParser, List<WindowedSegmentId>> withSplit(InputSplit<List<WindowedSegmentId>> split)
{
return new IngestSegmentFirehoseFactory(
dataSource,
null,
split.get(),
dimFilter,
dimensions,
metrics,
maxInputSegmentBytesPerTask,
indexIO,
coordinatorClient,
segmentLoaderFactory,
retryPolicyFactory
);
}
@JsonProperty
public String getDataSource()
{
@ -116,6 +164,12 @@ public class IngestSegmentFirehoseFactory implements FirehoseFactory<InputRowPar
return interval;
}
@JsonProperty
public List<WindowedSegmentId> getSegments()
{
return segmentIds;
}
@JsonProperty("filter")
public DimFilter getDimensionsFilter()
{
@ -134,50 +188,40 @@ public class IngestSegmentFirehoseFactory implements FirehoseFactory<InputRowPar
return metrics;
}
@JsonProperty
public long getMaxInputSegmentBytesPerTask()
{
return maxInputSegmentBytesPerTask;
}
@Override
public Firehose connect(InputRowParser inputRowParser, File temporaryDirectory) throws ParseException
{
log.info("Connecting firehose: dataSource[%s], interval[%s]", dataSource, interval);
log.info(
"Connecting firehose: dataSource[%s], interval[%s], segmentIds[%s]",
dataSource,
interval,
segmentIds
);
try {
// This call used to use the TaskActionClient, so for compatibility we use the same retry configuration
// as TaskActionClient.
final RetryPolicy retryPolicy = retryPolicyFactory.makeRetryPolicy();
List<DataSegment> usedSegments;
while (true) {
try {
usedSegments =
coordinatorClient.getDatabaseSegmentDataSourceSegments(dataSource, Collections.singletonList(interval));
break;
}
catch (Throwable e) {
log.warn(e, "Exception getting database segments");
final Duration delay = retryPolicy.getAndIncrementRetryDelay();
if (delay == null) {
throw e;
} else {
final long sleepTime = jitter(delay.getMillis());
log.info("Will try again in [%s].", new Duration(sleepTime).toString());
try {
Thread.sleep(sleepTime);
}
catch (InterruptedException e2) {
throw new RuntimeException(e2);
}
final List<TimelineObjectHolder<String, DataSegment>> timeLineSegments = getTimeline();
// Download all segments locally.
// Note: this requires enough local storage space to fit all of the segments, even though
// IngestSegmentFirehose iterates over the segments in series. We may want to change this
// to download files lazily, perhaps sharing code with PrefetchableTextFilesFirehoseFactory.
final SegmentLoader segmentLoader = segmentLoaderFactory.manufacturate(temporaryDirectory);
Map<DataSegment, File> segmentFileMap = Maps.newLinkedHashMap();
for (TimelineObjectHolder<String, DataSegment> holder : timeLineSegments) {
for (PartitionChunk<DataSegment> chunk : holder.getObject()) {
final DataSegment segment = chunk.getObject();
if (!segmentFileMap.containsKey(segment)) {
segmentFileMap.put(segment, segmentLoader.getSegmentFiles(segment));
}
}
}
final SegmentLoader segmentLoader = segmentLoaderFactory.manufacturate(temporaryDirectory);
Map<DataSegment, File> segmentFileMap = Maps.newLinkedHashMap();
for (DataSegment segment : usedSegments) {
segmentFileMap.put(segment, segmentLoader.getSegmentFiles(segment));
}
final List<TimelineObjectHolder<String, DataSegment>> timeLineSegments = VersionedIntervalTimeline
.forSegments(usedSegments)
.lookup(interval);
final List<String> dims;
if (dimensions != null) {
dims = dimensions;
@ -250,6 +294,179 @@ public class IngestSegmentFirehoseFactory implements FirehoseFactory<InputRowPar
return retval < 0 ? 0 : retval;
}
private List<TimelineObjectHolder<String, DataSegment>> getTimeline()
{
if (interval == null) {
return getTimelineForSegmentIds();
} else {
return getTimelineForInterval();
}
}
private List<TimelineObjectHolder<String, DataSegment>> getTimelineForInterval()
{
Preconditions.checkNotNull(interval);
// This call used to use the TaskActionClient, so for compatibility we use the same retry configuration
// as TaskActionClient.
final RetryPolicy retryPolicy = retryPolicyFactory.makeRetryPolicy();
List<DataSegment> usedSegments;
while (true) {
try {
usedSegments =
coordinatorClient.getDatabaseSegmentDataSourceSegments(dataSource, Collections.singletonList(interval));
break;
}
catch (Throwable e) {
log.warn(e, "Exception getting database segments");
final Duration delay = retryPolicy.getAndIncrementRetryDelay();
if (delay == null) {
throw e;
} else {
final long sleepTime = jitter(delay.getMillis());
log.info("Will try again in [%s].", new Duration(sleepTime).toString());
try {
Thread.sleep(sleepTime);
}
catch (InterruptedException e2) {
throw new RuntimeException(e2);
}
}
}
}
return VersionedIntervalTimeline.forSegments(usedSegments).lookup(interval);
}
private List<TimelineObjectHolder<String, DataSegment>> getTimelineForSegmentIds()
{
final SortedMap<Interval, TimelineObjectHolder<String, DataSegment>> timeline = new TreeMap<>(
Comparators.intervalsByStartThenEnd()
);
for (WindowedSegmentId windowedSegmentId : Preconditions.checkNotNull(segmentIds)) {
final DataSegment segment = coordinatorClient.getDatabaseSegmentDataSourceSegment(
dataSource,
windowedSegmentId.getSegmentId()
);
for (Interval interval : windowedSegmentId.getIntervals()) {
final TimelineObjectHolder<String, DataSegment> existingHolder = timeline.get(interval);
if (existingHolder != null) {
if (!existingHolder.getVersion().equals(segment.getVersion())) {
throw new ISE("Timeline segments with the same interval should have the same version: " +
"existing version[%s] vs new segment[%s]", existingHolder.getVersion(), segment);
}
existingHolder.getObject().add(segment.getShardSpec().createChunk(segment));
} else {
timeline.put(interval, new TimelineObjectHolder<>(
interval,
segment.getInterval(),
segment.getVersion(),
new PartitionHolder<DataSegment>(segment.getShardSpec().createChunk(segment))
));
}
}
}
// Validate that none of the given windows overlaps (except for when multiple segments share exactly the
// same interval).
Interval lastInterval = null;
for (Interval interval : timeline.keySet()) {
if (lastInterval != null) {
if (interval.overlaps(lastInterval)) {
throw new IAE(
"Distinct intervals in input segments may not overlap: [%s] vs [%s]",
lastInterval,
interval
);
}
}
lastInterval = interval;
}
return new ArrayList<>(timeline.values());
}
private void initializeSplitsIfNeeded()
{
if (splits != null) {
return;
}
// isSplittable() ensures this is only called when we have an interval.
final List<TimelineObjectHolder<String, DataSegment>> timelineSegments = getTimelineForInterval();
// We do the simplest possible greedy algorithm here instead of anything cleverer. The general bin packing
// problem is NP-hard, and we'd like to get segments from the same interval into the same split so that their
// data can combine with each other anyway.
List<InputSplit<List<WindowedSegmentId>>> newSplits = new ArrayList<>();
List<WindowedSegmentId> currentSplit = new ArrayList<>();
Map<DataSegment, WindowedSegmentId> windowedSegmentIds = new HashMap<>();
long bytesInCurrentSplit = 0;
for (TimelineObjectHolder<String, DataSegment> timelineHolder : timelineSegments) {
for (PartitionChunk<DataSegment> chunk : timelineHolder.getObject()) {
final DataSegment segment = chunk.getObject();
final WindowedSegmentId existingWindowedSegmentId = windowedSegmentIds.get(segment);
if (existingWindowedSegmentId != null) {
// We've already seen this segment in the timeline, so just add this interval to it. It has already
// been placed into a split.
existingWindowedSegmentId.getIntervals().add(timelineHolder.getInterval());
} else {
// It's the first time we've seen this segment, so create a new WindowedSegmentId.
List<Interval> intervals = new ArrayList<>();
// Use the interval that contributes to the timeline, not the entire segment's true interval.
intervals.add(timelineHolder.getInterval());
final WindowedSegmentId newWindowedSegmentId = new WindowedSegmentId(segment.getId().toString(), intervals);
windowedSegmentIds.put(segment, newWindowedSegmentId);
// Now figure out if it goes in the current split or not.
final long segmentBytes = segment.getSize();
if (bytesInCurrentSplit + segmentBytes > maxInputSegmentBytesPerTask && !currentSplit.isEmpty()) {
// This segment won't fit in the current non-empty split, so this split is done.
newSplits.add(new InputSplit<>(currentSplit));
currentSplit = new ArrayList<>();
bytesInCurrentSplit = 0;
}
if (segmentBytes > maxInputSegmentBytesPerTask) {
// If this segment is itself bigger than our max, just put it in its own split.
Preconditions.checkState(currentSplit.isEmpty() && bytesInCurrentSplit == 0);
newSplits.add(new InputSplit<>(Collections.singletonList(newWindowedSegmentId)));
} else {
currentSplit.add(newWindowedSegmentId);
bytesInCurrentSplit += segmentBytes;
}
}
}
}
if (!currentSplit.isEmpty()) {
newSplits.add(new InputSplit<>(currentSplit));
}
splits = newSplits;
}
@Override
public boolean isSplittable()
{
// Specifying 'segments' to this factory instead of 'interval' is intended primarily for internal use by
// parallel batch injection: we don't need to support splitting a list of segments.
return interval != null;
}
@Override
public Stream<InputSplit<List<WindowedSegmentId>>> getSplits()
{
initializeSplitsIfNeeded();
return splits.stream();
}
@Override
public int getNumSplits()
{
initializeSplitsIfNeeded();
return splits.size();
}
@VisibleForTesting
static List<String> getUniqueDimensions(
List<TimelineObjectHolder<String, DataSegment>> timelineSegments,

View File

@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.firehose;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import org.joda.time.Interval;
import java.util.List;
/**
* A WindowedSegment represents a segment plus the list of intervals inside it which contribute to a timeline.
* <p>
* This class is intended for serialization in specs.
*/
public class WindowedSegmentId
{
// This is of the form used by SegmentId.
private final String segmentId;
private final List<Interval> intervals;
@JsonCreator
public WindowedSegmentId(
@JsonProperty("segmentId") String segmentId,
@JsonProperty("intervals") List<Interval> intervals
)
{
this.segmentId = Preconditions.checkNotNull(segmentId, "null segmentId");
this.intervals = Preconditions.checkNotNull(intervals, "null intervals");
}
@JsonProperty
public String getSegmentId()
{
return segmentId;
}
@JsonProperty
public List<Interval> getIntervals()
{
return intervals;
}
}

View File

@ -197,9 +197,11 @@ public class IngestSegmentFirehoseFactoryTest
final IngestSegmentFirehoseFactory isfFactory = new IngestSegmentFirehoseFactory(
TASK.getDataSource(),
Intervals.ETERNITY,
null,
new SelectorDimFilter(DIM_NAME, DIM_VALUE, null),
dim_names,
metric_names,
null,
INDEX_IO,
cc,
slf,

View File

@ -26,8 +26,10 @@ import com.google.common.collect.Iterables;
import com.google.common.io.Files;
import org.apache.commons.io.FileUtils;
import org.apache.druid.client.coordinator.CoordinatorClient;
import org.apache.druid.data.input.FiniteFirehoseFactory;
import org.apache.druid.data.input.Firehose;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.InputSplit;
import org.apache.druid.data.input.MapBasedInputRow;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.InputRowParser;
@ -72,6 +74,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
@RunWith(Parameterized.class)
public class IngestSegmentFirehoseFactoryTimelineTest
@ -101,6 +104,7 @@ public class IngestSegmentFirehoseFactoryTimelineTest
private final File tmpDir;
private final int expectedCount;
private final long expectedSum;
private final int segmentCount;
private static final ObjectMapper MAPPER;
private static final IndexIO INDEX_IO;
@ -118,17 +122,28 @@ public class IngestSegmentFirehoseFactoryTimelineTest
IngestSegmentFirehoseFactory factory,
File tmpDir,
int expectedCount,
long expectedSum
long expectedSum,
int segmentCount
)
{
this.factory = factory;
this.tmpDir = tmpDir;
this.expectedCount = expectedCount;
this.expectedSum = expectedSum;
this.segmentCount = segmentCount;
}
@Test
public void testSimple() throws Exception
public void test() throws Exception
{
// Junit 4.12 doesn't have a good way to run tearDown after multiple tests in a Parameterized
// class run. (Junit 4.13 adds @AfterParam but isn't released yet.) Fake it by just running
// "tests" in series inside one @Test.
testSimple();
testSplit();
}
private void testSimple() throws Exception
{
int count = 0;
long sum = 0;
@ -145,6 +160,36 @@ public class IngestSegmentFirehoseFactoryTimelineTest
Assert.assertEquals("sum", expectedSum, sum);
}
private void testSplit() throws Exception
{
Assert.assertTrue(factory.isSplittable());
final int numSplits = factory.getNumSplits();
// We set maxInputSegmentBytesPerSplit to 2 so each segment should become a byte.
Assert.assertEquals(segmentCount, numSplits);
final List<InputSplit<List<WindowedSegmentId>>> splits =
factory.getSplits().collect(Collectors.toList());
Assert.assertEquals(numSplits, splits.size());
int count = 0;
long sum = 0;
for (InputSplit<List<WindowedSegmentId>> split : splits) {
final FiniteFirehoseFactory<InputRowParser, List<WindowedSegmentId>> splitFactory =
factory.withSplit(split);
try (final Firehose firehose = splitFactory.connect(ROW_PARSER, null)) {
while (firehose.hasMore()) {
final InputRow row = firehose.nextRow();
count++;
sum += row.getMetric(METRICS[0]).longValue();
}
}
}
Assert.assertEquals("count", expectedCount, count);
Assert.assertEquals("sum", expectedSum, sum);
}
@After
public void tearDown() throws Exception
{
@ -285,13 +330,26 @@ public class IngestSegmentFirehoseFactoryTimelineTest
throw new IllegalArgumentException("WTF");
}
}
@Override
public DataSegment getDatabaseSegmentDataSourceSegment(String dataSource, String segmentId)
{
return testCase.segments
.stream()
.filter(s -> s.getId().toString().equals(segmentId))
.findAny()
.get(); // throwing if not found is exactly what the real code does
}
};
final IngestSegmentFirehoseFactory factory = new IngestSegmentFirehoseFactory(
DATA_SOURCE,
testCase.interval,
null,
new TrueDimFilter(),
Arrays.asList(DIMENSIONS),
Arrays.asList(METRICS),
// Split as much as possible
1L,
INDEX_IO,
cc,
slf,
@ -304,7 +362,8 @@ public class IngestSegmentFirehoseFactoryTimelineTest
factory,
testCase.tmpDir,
testCase.expectedCount,
testCase.expectedSum
testCase.expectedSum,
testCase.segments.size()
}
);
}
@ -384,7 +443,7 @@ public class IngestSegmentFirehoseFactoryTimelineTest
Arrays.asList(METRICS),
new LinearShardSpec(partitionNum),
-1,
0L
2L
);
}
}

View File

@ -33,11 +33,16 @@ public class ITParallelIndexTest extends AbstractITBatchIndexTest
private static String REINDEX_TASK = "/indexer/wikipedia_parallel_reindex_task.json";
private static String REINDEX_QUERIES_RESOURCE = "/indexer/wikipedia_parallel_reindex_queries.json";
private static String INDEX_DATASOURCE = "wikipedia_parallel_index_test";
private static String INDEX_INGEST_SEGMENT_DATASOURCE = "wikipedia_parallel_ingest_segment_index_test";
private static String INDEX_INGEST_SEGMENT_TASK = "/indexer/wikipedia_parallel_ingest_segment_index_task.json";
@Test
public void testIndexData() throws Exception
{
try (final Closeable closeable = unloader(INDEX_DATASOURCE + config.getExtraDatasourceNameSuffix())) {
try (final Closeable indexCloseable = unloader(INDEX_DATASOURCE + config.getExtraDatasourceNameSuffix());
final Closeable ingestSegmentCloseable = unloader(
INDEX_INGEST_SEGMENT_DATASOURCE + config.getExtraDatasourceNameSuffix());
) {
doIndexTestTest(
INDEX_DATASOURCE,
INDEX_TASK,
@ -53,6 +58,13 @@ public class ITParallelIndexTest extends AbstractITBatchIndexTest
REINDEX_QUERIES_RESOURCE,
true
);
doReindexTest(
INDEX_DATASOURCE,
INDEX_INGEST_SEGMENT_DATASOURCE,
INDEX_INGEST_SEGMENT_TASK,
REINDEX_QUERIES_RESOURCE
);
}
}
}

View File

@ -0,0 +1,63 @@
{
"type": "index_parallel",
"spec": {
"dataSchema": {
"dataSource": "%%REINDEX_DATASOURCE%%",
"metricsSpec": [
{
"type": "count",
"name": "count"
},
{
"type": "doubleSum",
"name": "added",
"fieldName": "added"
},
{
"type": "doubleSum",
"name": "deleted",
"fieldName": "deleted"
},
{
"type": "doubleSum",
"name": "delta",
"fieldName": "delta"
}
],
"granularitySpec": {
"segmentGranularity": "DAY",
"queryGranularity": "second",
"intervals": [
"2013-08-31/2013-09-02"
]
},
"parser": {
"parseSpec": {
"format": "json",
"timestampSpec": {
"column": "timestamp"
},
"dimensionsSpec": {
"dimensionExclusions": [
"robot",
"continent"
]
}
}
}
},
"ioConfig": {
"type": "index_parallel",
"firehose": {
"type": "ingestSegment",
"dataSource": "%%DATASOURCE%%",
"interval": "2013-08-31/2013-09-02",
"maxInputSegmentBytesPerTask": 1
}
},
"tuningConfig": {
"type": "index_parallel",
"maxNumSubTasks": 10
}
}
}

View File

@ -156,4 +156,36 @@ public class CoordinatorClient
throw new RuntimeException(e);
}
}
public DataSegment getDatabaseSegmentDataSourceSegment(String dataSource, String segmentId)
{
try {
FullResponseHolder response = druidLeaderClient.go(
druidLeaderClient.makeRequest(
HttpMethod.GET,
StringUtils.format(
"/druid/coordinator/v1/metadata/datasources/%s/segments/%s",
StringUtils.urlEncode(dataSource),
StringUtils.urlEncode(segmentId)
)
)
);
if (!response.getStatus().equals(HttpResponseStatus.OK)) {
throw new ISE(
"Error while fetching database segment data source segment status[%s] content[%s]",
response.getStatus(),
response.getContent()
);
}
return jsonMapper.readValue(
response.getContent(), new TypeReference<DataSegment>()
{
}
);
}
catch (Exception e) {
throw new RuntimeException(e);
}
}
}