Merge pull request #1681 from gianm/ingest-segment-overlapping

Fix overlapping segments in IngestSegmentFirehose, DatasourceInputFormat
This commit is contained in:
Himanshu 2015-08-28 11:12:51 -05:00
commit ceaa49ec4f
19 changed files with 1038 additions and 266 deletions

View File

@ -20,11 +20,17 @@ package io.druid.indexer;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists;
import com.google.common.collect.Ordering;
import io.druid.indexer.hadoop.DatasourceIngestionSpec;
import io.druid.indexer.hadoop.WindowedDataSegment;
import io.druid.indexer.path.UsedSegmentLister;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.IngestionSpec;
import io.druid.timeline.DataSegment;
import io.druid.timeline.TimelineObjectHolder;
import io.druid.timeline.VersionedIntervalTimeline;
import io.druid.timeline.partition.PartitionChunk;
import java.io.IOException;
import java.util.List;
@ -116,25 +122,40 @@ public class HadoopIngestionSpec extends IngestionSpec<HadoopIOConfig, HadoopTun
Map<String, Object> pathSpec = spec.getIOConfig().getPathSpec();
Map<String, Object> datasourcePathSpec = null;
if(pathSpec.get(type).equals(dataSource)) {
if (pathSpec.get(type).equals(dataSource)) {
datasourcePathSpec = pathSpec;
} else if(pathSpec.get(type).equals(multi)) {
} else if (pathSpec.get(type).equals(multi)) {
List<Map<String, Object>> childPathSpecs = (List<Map<String, Object>>) pathSpec.get(children);
for(Map<String, Object> childPathSpec : childPathSpecs) {
for (Map<String, Object> childPathSpec : childPathSpecs) {
if (childPathSpec.get(type).equals(dataSource)) {
datasourcePathSpec = childPathSpec;
break;
}
}
}
if (datasourcePathSpec != null) {
Map<String, Object> ingestionSpecMap = (Map<String, Object>) datasourcePathSpec.get(ingestionSpec);
DatasourceIngestionSpec ingestionSpecObj = jsonMapper.convertValue(ingestionSpecMap, DatasourceIngestionSpec.class);
DatasourceIngestionSpec ingestionSpecObj = jsonMapper.convertValue(
ingestionSpecMap,
DatasourceIngestionSpec.class
);
List<DataSegment> segmentsList = segmentLister.getUsedSegmentsForInterval(
ingestionSpecObj.getDataSource(),
ingestionSpecObj.getInterval()
);
datasourcePathSpec.put(segments, segmentsList);
VersionedIntervalTimeline<String, DataSegment> timeline = new VersionedIntervalTimeline<>(Ordering.natural());
for (DataSegment segment : segmentsList) {
timeline.add(segment.getInterval(), segment.getVersion(), segment.getShardSpec().createChunk(segment));
}
final List<TimelineObjectHolder<String, DataSegment>> timeLineSegments = timeline.lookup(ingestionSpecObj.getInterval());
final List<WindowedDataSegment> windowedSegments = Lists.newArrayList();
for (TimelineObjectHolder<String, DataSegment> holder : timeLineSegments) {
for (PartitionChunk<DataSegment> chunk : holder.getObject()) {
windowedSegments.add(new WindowedDataSegment(chunk.getObject(), holder.getInterval()));
}
}
datasourcePathSpec.put(segments, windowedSegments);
}
return spec;

View File

@ -20,7 +20,6 @@
package io.druid.indexer.hadoop;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.metamx.common.ISE;
@ -56,9 +55,9 @@ public class DatasourceInputFormat extends InputFormat<NullWritable, InputRow>
Configuration conf = context.getConfiguration();
String segmentsStr = Preconditions.checkNotNull(conf.get(CONF_INPUT_SEGMENTS), "No segments found to read");
List<DataSegment> segments = HadoopDruidIndexerConfig.jsonMapper.readValue(
List<WindowedDataSegment> segments = HadoopDruidIndexerConfig.jsonMapper.readValue(
segmentsStr,
new TypeReference<List<DataSegment>>()
new TypeReference<List<WindowedDataSegment>>()
{
}
);
@ -75,12 +74,12 @@ public class DatasourceInputFormat extends InputFormat<NullWritable, InputRow>
//are combined appropriately
Collections.sort(
segments,
new Comparator<DataSegment>()
new Comparator<WindowedDataSegment>()
{
@Override
public int compare(DataSegment s1, DataSegment s2)
public int compare(WindowedDataSegment s1, WindowedDataSegment s2)
{
return Long.compare(s1.getSize(), s2.getSize());
return Long.compare(s1.getSegment().getSize(), s2.getSegment().getSize());
}
}
);
@ -88,18 +87,18 @@ public class DatasourceInputFormat extends InputFormat<NullWritable, InputRow>
List<InputSplit> splits = Lists.newArrayList();
List<DataSegment> list = new ArrayList<>();
List<WindowedDataSegment> list = new ArrayList<>();
long size = 0;
for (DataSegment segment : segments) {
if (size + segment.getSize() > maxSize && size > 0) {
for (WindowedDataSegment segment : segments) {
if (size + segment.getSegment().getSize() > maxSize && size > 0) {
splits.add(new DatasourceInputSplit(list));
list = Lists.newArrayList();
size = 0;
}
list.add(segment);
size += segment.getSize();
size += segment.getSegment().getSize();
}
if (list.size() > 0) {

View File

@ -19,30 +19,28 @@
package io.druid.indexer.hadoop;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.base.Preconditions;
import io.druid.indexer.HadoopDruidIndexerConfig;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.InputSplit;
import javax.validation.constraints.NotNull;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.List;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.base.Preconditions;
import io.druid.indexer.HadoopDruidIndexerConfig;
import io.druid.timeline.DataSegment;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.InputSplit;
import javax.validation.constraints.NotNull;
public class DatasourceInputSplit extends InputSplit implements Writable
{
private List<DataSegment> segments = null;
private List<WindowedDataSegment> segments = null;
//required for deserialization
public DatasourceInputSplit()
{
}
public DatasourceInputSplit(@NotNull List<DataSegment> segments)
public DatasourceInputSplit(@NotNull List<WindowedDataSegment> segments)
{
Preconditions.checkArgument(segments != null && segments.size() > 0, "no segments");
this.segments = segments;
@ -52,8 +50,8 @@ public class DatasourceInputSplit extends InputSplit implements Writable
public long getLength() throws IOException, InterruptedException
{
long size = 0;
for (DataSegment segment : segments) {
size += segment.getSize();
for (WindowedDataSegment segment : segments) {
size += segment.getSegment().getSize();
}
return size;
}
@ -64,7 +62,7 @@ public class DatasourceInputSplit extends InputSplit implements Writable
return new String[]{};
}
public List<DataSegment> getSegments()
public List<WindowedDataSegment> getSegments()
{
return segments;
}
@ -80,7 +78,7 @@ public class DatasourceInputSplit extends InputSplit implements Writable
{
segments = HadoopDruidIndexerConfig.jsonMapper.readValue(
in.readUTF(),
new TypeReference<List<DataSegment>>()
new TypeReference<List<WindowedDataSegment>>()
{
}
);

View File

@ -36,9 +36,8 @@ import io.druid.indexer.JobHelper;
import io.druid.segment.IndexIO;
import io.druid.segment.QueryableIndex;
import io.druid.segment.QueryableIndexStorageAdapter;
import io.druid.segment.StorageAdapter;
import io.druid.segment.realtime.firehose.IngestSegmentFirehose;
import io.druid.timeline.DataSegment;
import io.druid.segment.realtime.firehose.WindowedStorageAdapter;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
@ -70,18 +69,18 @@ public class DatasourceRecordReader extends RecordReader<NullWritable, InputRow>
{
spec = readAndVerifyDatasourceIngestionSpec(context.getConfiguration(), HadoopDruidIndexerConfig.jsonMapper);
List<DataSegment> segments = ((DatasourceInputSplit) split).getSegments();
List<WindowedDataSegment> segments = ((DatasourceInputSplit) split).getSegments();
List<StorageAdapter> adapters = Lists.transform(
List<WindowedStorageAdapter> adapters = Lists.transform(
segments,
new Function<DataSegment, StorageAdapter>()
new Function<WindowedDataSegment, WindowedStorageAdapter>()
{
@Override
public StorageAdapter apply(DataSegment segment)
public WindowedStorageAdapter apply(WindowedDataSegment segment)
{
try {
logger.info("Getting storage path for segment [%s]", segment.getIdentifier());
Path path = new Path(JobHelper.getURIFromSegment(segment));
logger.info("Getting storage path for segment [%s]", segment.getSegment().getIdentifier());
Path path = new Path(JobHelper.getURIFromSegment(segment.getSegment()));
logger.info("Fetch segment files from [%s]", path);
@ -96,7 +95,10 @@ public class DatasourceRecordReader extends RecordReader<NullWritable, InputRow>
indexes.add(index);
numRows += index.getNumRows();
return new QueryableIndexStorageAdapter(index);
return new WindowedStorageAdapter(
new QueryableIndexStorageAdapter(index),
segment.getInterval()
);
}
catch (IOException ex) {
throw Throwables.propagate(ex);
@ -110,7 +112,6 @@ public class DatasourceRecordReader extends RecordReader<NullWritable, InputRow>
spec.getDimensions(),
spec.getMetrics(),
spec.getFilter(),
spec.getInterval(),
spec.getGranularity()
);

View File

@ -0,0 +1,89 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.indexer.hadoop;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.druid.timeline.DataSegment;
import org.joda.time.Interval;
import java.util.Objects;
public class WindowedDataSegment
{
private final DataSegment segment;
private final Interval interval;
@JsonCreator
public WindowedDataSegment(
@JsonProperty("segment") final DataSegment segment,
@JsonProperty("interval") final Interval interval
)
{
this.segment = segment;
this.interval = interval;
}
@JsonProperty
public DataSegment getSegment()
{
return segment;
}
@JsonProperty
public Interval getInterval()
{
return interval;
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
WindowedDataSegment that = (WindowedDataSegment) o;
return Objects.equals(segment, that.segment) &&
Objects.equals(interval, that.interval);
}
@Override
public int hashCode()
{
return Objects.hash(segment, interval);
}
@Override
public String toString()
{
return "WindowedDataSegment{" +
"segment=" + segment +
", interval=" + interval +
'}';
}
public static WindowedDataSegment of(final DataSegment segment)
{
return new WindowedDataSegment(segment, segment.getInterval());
}
}

View File

@ -20,6 +20,7 @@
package io.druid.indexer.path;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
@ -31,8 +32,8 @@ import com.metamx.common.logger.Logger;
import io.druid.indexer.HadoopDruidIndexerConfig;
import io.druid.indexer.hadoop.DatasourceIngestionSpec;
import io.druid.indexer.hadoop.DatasourceInputFormat;
import io.druid.indexer.hadoop.WindowedDataSegment;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.timeline.DataSegment;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
@ -48,11 +49,12 @@ public class DatasourcePathSpec implements PathSpec
private final ObjectMapper mapper;
private final DatasourceIngestionSpec ingestionSpec;
private final long maxSplitSize;
private final List<DataSegment> segments;
private final List<WindowedDataSegment> segments;
@JsonCreator
public DatasourcePathSpec(
@JacksonInject ObjectMapper mapper,
@JsonProperty("segments") List<DataSegment> segments,
@JsonProperty("segments") List<WindowedDataSegment> segments,
@JsonProperty("ingestionSpec") DatasourceIngestionSpec spec,
@JsonProperty("maxSplitSize") Long maxSplitSize
)
@ -61,7 +63,7 @@ public class DatasourcePathSpec implements PathSpec
this.segments = segments;
this.ingestionSpec = Preconditions.checkNotNull(spec, "null ingestionSpec");
if(maxSplitSize == null) {
if (maxSplitSize == null) {
this.maxSplitSize = 0;
} else {
this.maxSplitSize = maxSplitSize.longValue();
@ -69,7 +71,7 @@ public class DatasourcePathSpec implements PathSpec
}
@JsonProperty
public List<DataSegment> getSegments()
public List<WindowedDataSegment> getSegments()
{
return segments;
}
@ -110,12 +112,12 @@ public class DatasourcePathSpec implements PathSpec
Iterables.concat(
Iterables.transform(
segments,
new Function<DataSegment, Iterable<String>>()
new Function<WindowedDataSegment, Iterable<String>>()
{
@Override
public Iterable<String> apply(DataSegment dataSegment)
public Iterable<String> apply(WindowedDataSegment dataSegment)
{
return dataSegment.getDimensions();
return dataSegment.getSegment().getDimensions();
}
}
)

View File

@ -33,6 +33,7 @@ import com.metamx.common.ISE;
import com.metamx.common.logger.Logger;
import io.druid.indexer.JobHelper;
import io.druid.indexer.hadoop.DatasourceInputSplit;
import io.druid.indexer.hadoop.WindowedDataSegment;
import io.druid.segment.IndexIO;
import io.druid.segment.IndexMerger;
import io.druid.timeline.DataSegment;
@ -476,7 +477,7 @@ public class HadoopConverterJob
final String tmpDirLoc = context.getConfiguration().get(TMP_FILE_LOC_KEY);
final File tmpDir = Paths.get(tmpDirLoc).toFile();
final DataSegment segment = Iterables.getOnlyElement(((DatasourceInputSplit) split).getSegments());
final DataSegment segment = Iterables.getOnlyElement(((DatasourceInputSplit) split).getSegments()).getSegment();
final HadoopDruidConverterConfig config = converterConfigFromConfiguration(context.getConfiguration());
@ -584,7 +585,7 @@ public class HadoopConverterJob
@Override
public InputSplit apply(DataSegment input)
{
return new DatasourceInputSplit(ImmutableList.of(input));
return new DatasourceInputSplit(ImmutableList.of(WindowedDataSegment.of(input)));
}
}
);

View File

@ -22,10 +22,11 @@ package io.druid.indexer;
import com.fasterxml.jackson.databind.InjectableValues;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.io.Files;
import com.metamx.common.Granularity;
import io.druid.data.input.Firehose;
import io.druid.data.input.InputRow;
@ -34,8 +35,7 @@ import io.druid.data.input.impl.DimensionsSpec;
import io.druid.data.input.impl.StringInputRowParser;
import io.druid.data.input.impl.TimestampSpec;
import io.druid.granularity.QueryGranularity;
import io.druid.indexer.path.DatasourcePathSpec;
import io.druid.indexer.path.UsedSegmentLister;
import io.druid.indexer.hadoop.WindowedDataSegment;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.LongSumAggregatorFactory;
@ -48,6 +48,7 @@ import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.granularity.UniformGranularitySpec;
import io.druid.segment.loading.LocalDataSegmentPuller;
import io.druid.segment.realtime.firehose.IngestSegmentFirehose;
import io.druid.segment.realtime.firehose.WindowedStorageAdapter;
import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.HashBasedNumberedShardSpec;
import org.apache.commons.io.FileUtils;
@ -65,42 +66,45 @@ import java.util.Map;
public class BatchDeltaIngestionTest
{
public final
@Rule
TemporaryFolder temporaryFolder = new TemporaryFolder();
public final TemporaryFolder temporaryFolder = new TemporaryFolder();
private ObjectMapper mapper;
private Interval interval;
private List<DataSegment> segments;
private static final ObjectMapper MAPPER;
private static final Interval INTERVAL_FULL = new Interval("2014-10-22T00:00:00Z/P1D");
private static final Interval INTERVAL_PARTIAL = new Interval("2014-10-22T00:00:00Z/PT2H");
private static final DataSegment SEGMENT;
public BatchDeltaIngestionTest() throws IOException
{
mapper = new DefaultObjectMapper();
mapper.registerSubtypes(new NamedType(HashBasedNumberedShardSpec.class, "hashed"));
InjectableValues inject = new InjectableValues.Std().addValue(ObjectMapper.class, mapper);
mapper.setInjectableValues(inject);
static {
MAPPER = new DefaultObjectMapper();
MAPPER.registerSubtypes(new NamedType(HashBasedNumberedShardSpec.class, "hashed"));
InjectableValues inject = new InjectableValues.Std().addValue(ObjectMapper.class, MAPPER);
MAPPER.setInjectableValues(inject);
this.interval = new Interval("2014-10-22T00:00:00Z/P1D");
segments = ImmutableList.of(
new DefaultObjectMapper()
.readValue(
this.getClass().getClassLoader().getResource("test-segment/descriptor.json"),
DataSegment.class
)
.withLoadSpec(
ImmutableMap.<String, Object>of(
"type",
"local",
"path",
this.getClass().getClassLoader().getResource("test-segment/index.zip").getPath()
)
)
);
try {
SEGMENT = new DefaultObjectMapper()
.readValue(
BatchDeltaIngestionTest.class.getClassLoader().getResource("test-segment/descriptor.json"),
DataSegment.class
)
.withLoadSpec(
ImmutableMap.<String, Object>of(
"type",
"local",
"path",
BatchDeltaIngestionTest.class.getClassLoader().getResource("test-segment/index.zip").getPath()
)
);
}
catch (IOException e) {
throw Throwables.propagate(e);
}
}
@Test
public void testReindexing() throws Exception
{
List<WindowedDataSegment> segments = ImmutableList.of(new WindowedDataSegment(SEGMENT, INTERVAL_FULL));
HadoopDruidIndexerConfig config = makeHadoopDruidIndexerConfig(
ImmutableMap.<String, Object>of(
"type",
@ -110,7 +114,7 @@ public class BatchDeltaIngestionTest
"dataSource",
"xyz",
"interval",
interval
INTERVAL_FULL
),
"segments",
segments
@ -139,7 +143,47 @@ public class BatchDeltaIngestionTest
)
);
testIngestion(config, expectedRows);
testIngestion(config, expectedRows, Iterables.getOnlyElement(segments));
}
@Test
public void testReindexingWithPartialWindow() throws Exception
{
List<WindowedDataSegment> segments = ImmutableList.of(new WindowedDataSegment(SEGMENT, INTERVAL_PARTIAL));
HadoopDruidIndexerConfig config = makeHadoopDruidIndexerConfig(
ImmutableMap.<String, Object>of(
"type",
"dataSource",
"ingestionSpec",
ImmutableMap.of(
"dataSource",
"xyz",
"interval",
INTERVAL_FULL
),
"segments",
segments
),
temporaryFolder.newFolder()
);
List<ImmutableMap<String, Object>> expectedRows = ImmutableList.of(
ImmutableMap.<String, Object>of(
"time", DateTime.parse("2014-10-22T00:00:00.000Z"),
"host", ImmutableList.of("a.example.com"),
"visited_sum", 100L,
"unique_hosts", 1.0d
),
ImmutableMap.<String, Object>of(
"time", DateTime.parse("2014-10-22T01:00:00.000Z"),
"host", ImmutableList.of("b.example.com"),
"visited_sum", 150L,
"unique_hosts", 1.0d
)
);
testIngestion(config, expectedRows, Iterables.getOnlyElement(segments));
}
@Test
@ -169,6 +213,8 @@ public class BatchDeltaIngestionTest
//https://issues.apache.org/jira/browse/MAPREDUCE-5061
String inputPath = tmpDir.getPath() + "/{data1,data2}";
List<WindowedDataSegment> segments = ImmutableList.of(new WindowedDataSegment(SEGMENT, INTERVAL_FULL));
HadoopDruidIndexerConfig config = makeHadoopDruidIndexerConfig(
ImmutableMap.<String, Object>of(
"type",
@ -183,7 +229,7 @@ public class BatchDeltaIngestionTest
"dataSource",
"xyz",
"interval",
interval
INTERVAL_FULL
),
"segments",
segments
@ -220,11 +266,14 @@ public class BatchDeltaIngestionTest
)
);
testIngestion(config, expectedRows);
testIngestion(config, expectedRows, Iterables.getOnlyElement(segments));
}
private void testIngestion(HadoopDruidIndexerConfig config, List<ImmutableMap<String, Object>> expectedRowsGenerated)
throws Exception
private void testIngestion(
HadoopDruidIndexerConfig config,
List<ImmutableMap<String, Object>> expectedRowsGenerated,
WindowedDataSegment windowedDataSegment
) throws Exception
{
IndexGeneratorJob job = new LegacyIndexGeneratorJob(config);
JobHelper.runJobs(ImmutableList.<Jobby>of(job), config);
@ -234,8 +283,8 @@ public class BatchDeltaIngestionTest
"%s/%s/%s_%s/%s/0",
config.getSchema().getIOConfig().getSegmentOutputPath(),
config.getSchema().getDataSchema().getDataSource(),
interval.getStart().toString(),
interval.getEnd().toString(),
INTERVAL_FULL.getStart().toString(),
INTERVAL_FULL.getEnd().toString(),
config.getSchema().getTuningConfig().getVersion()
)
);
@ -247,10 +296,10 @@ public class BatchDeltaIngestionTest
Assert.assertTrue(descriptor.exists());
Assert.assertTrue(indexZip.exists());
DataSegment dataSegment = mapper.readValue(descriptor, DataSegment.class);
DataSegment dataSegment = MAPPER.readValue(descriptor, DataSegment.class);
Assert.assertEquals("website", dataSegment.getDataSource());
Assert.assertEquals(config.getSchema().getTuningConfig().getVersion(), dataSegment.getVersion());
Assert.assertEquals(interval, dataSegment.getInterval());
Assert.assertEquals(INTERVAL_FULL, dataSegment.getInterval());
Assert.assertEquals("local", dataSegment.getLoadSpec().get("type"));
Assert.assertEquals(indexZip.getCanonicalPath(), dataSegment.getLoadSpec().get("path"));
Assert.assertEquals("host", dataSegment.getDimensions().get(0));
@ -269,11 +318,10 @@ public class BatchDeltaIngestionTest
StorageAdapter adapter = new QueryableIndexStorageAdapter(index);
Firehose firehose = new IngestSegmentFirehose(
ImmutableList.of(adapter),
ImmutableList.of(new WindowedStorageAdapter(adapter, windowedDataSegment.getInterval())),
ImmutableList.of("host"),
ImmutableList.of("visited_sum", "unique_hosts"),
null,
interval,
QueryGranularity.NONE
);
@ -305,7 +353,7 @@ public class BatchDeltaIngestionTest
new HyperUniquesAggregatorFactory("unique_hosts", "host2")
},
new UniformGranularitySpec(
Granularity.DAY, QueryGranularity.NONE, ImmutableList.of(this.interval)
Granularity.DAY, QueryGranularity.NONE, ImmutableList.of(INTERVAL_FULL)
)
),
new HadoopIOConfig(
@ -337,7 +385,7 @@ public class BatchDeltaIngestionTest
config.setShardSpecs(
ImmutableMap.<DateTime, List<HadoopyShardSpec>>of(
interval.getStart(),
INTERVAL_FULL.getStart(),
ImmutableList.of(
new HadoopyShardSpec(
new HashBasedNumberedShardSpec(0, 1, HadoopDruidIndexerConfig.jsonMapper),
@ -352,6 +400,7 @@ public class BatchDeltaIngestionTest
private void verifyRows(List<ImmutableMap<String, Object>> expectedRows, List<InputRow> actualRows)
{
System.out.println("actualRows = " + actualRows);
Assert.assertEquals(expectedRows.size(), actualRows.size());
for (int i = 0; i < expectedRows.size(); i++) {

View File

@ -24,6 +24,7 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.metamx.common.Granularity;
import io.druid.indexer.hadoop.DatasourceIngestionSpec;
import io.druid.indexer.hadoop.WindowedDataSegment;
import io.druid.indexer.path.DatasourcePathSpec;
import io.druid.indexer.path.MultiplePathSpec;
import io.druid.indexer.path.PathSpec;
@ -40,7 +41,6 @@ import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Test;
import java.util.List;
import java.util.Map;
/**
@ -48,32 +48,30 @@ import java.util.Map;
public class HadoopIngestionSpecUpdateDatasourcePathSpecSegmentsTest
{
private final String testDatasource = "test";
private final Interval testDatasourceInterval = new Interval("1970/2000");
private final Interval testDatasourceInterval = new Interval("1970/3000");
private final Interval testDatasourceIntervalPartial = new Interval("2050/3000");
private final ObjectMapper jsonMapper = new DefaultObjectMapper();
private final List<DataSegment> segments = ImmutableList.of(
new DataSegment(
"test1",
Interval.parse("2000/3000"),
"ver",
ImmutableMap.<String, Object>of(
"type", "local",
"path", "/tmp/index1.zip"
),
ImmutableList.of("host"),
ImmutableList.of("visited_sum", "unique_hosts"),
new NoneShardSpec(),
9,
2
)
private static final DataSegment SEGMENT = new DataSegment(
"test1",
Interval.parse("2000/3000"),
"ver",
ImmutableMap.<String, Object>of(
"type", "local",
"path", "/tmp/index1.zip"
),
ImmutableList.of("host"),
ImmutableList.of("visited_sum", "unique_hosts"),
new NoneShardSpec(),
9,
2
);
@Test
public void testupdateSegmentListIfDatasourcePathSpecIsUsedWithNoDatasourcePathSpec() throws Exception
{
PathSpec pathSpec = new StaticPathSpec("/xyz", null);
HadoopDruidIndexerConfig config = testRunUpdateSegmentListIfDatasourcePathSpecIsUsed(pathSpec);
HadoopDruidIndexerConfig config = testRunUpdateSegmentListIfDatasourcePathSpecIsUsed(pathSpec, null);
Assert.assertTrue(config.getPathSpec() instanceof StaticPathSpec);
}
@ -86,8 +84,34 @@ public class HadoopIngestionSpecUpdateDatasourcePathSpecSegmentsTest
new DatasourceIngestionSpec(testDatasource, testDatasourceInterval, null, null, null, null),
null
);
HadoopDruidIndexerConfig config = testRunUpdateSegmentListIfDatasourcePathSpecIsUsed(pathSpec);
Assert.assertEquals(segments, ((DatasourcePathSpec) config.getPathSpec()).getSegments());
HadoopDruidIndexerConfig config = testRunUpdateSegmentListIfDatasourcePathSpecIsUsed(
pathSpec,
testDatasourceInterval
);
Assert.assertEquals(
ImmutableList.of(WindowedDataSegment.of(SEGMENT)),
((DatasourcePathSpec) config.getPathSpec()).getSegments()
);
}
@Test
public void testupdateSegmentListIfDatasourcePathSpecIsUsedWithJustDatasourcePathSpecAndPartialInterval()
throws Exception
{
PathSpec pathSpec = new DatasourcePathSpec(
jsonMapper,
null,
new DatasourceIngestionSpec(testDatasource, testDatasourceIntervalPartial, null, null, null, null),
null
);
HadoopDruidIndexerConfig config = testRunUpdateSegmentListIfDatasourcePathSpecIsUsed(
pathSpec,
testDatasourceIntervalPartial
);
Assert.assertEquals(
ImmutableList.of(new WindowedDataSegment(SEGMENT, testDatasourceIntervalPartial)),
((DatasourcePathSpec) config.getPathSpec()).getSegments()
);
}
@Test
@ -104,15 +128,19 @@ public class HadoopIngestionSpecUpdateDatasourcePathSpecSegmentsTest
)
)
);
HadoopDruidIndexerConfig config = testRunUpdateSegmentListIfDatasourcePathSpecIsUsed(pathSpec);
HadoopDruidIndexerConfig config = testRunUpdateSegmentListIfDatasourcePathSpecIsUsed(
pathSpec,
testDatasourceInterval
);
Assert.assertEquals(
segments,
ImmutableList.of(WindowedDataSegment.of(SEGMENT)),
((DatasourcePathSpec) ((MultiplePathSpec) config.getPathSpec()).getChildren().get(1)).getSegments()
);
}
private HadoopDruidIndexerConfig testRunUpdateSegmentListIfDatasourcePathSpecIsUsed(
PathSpec datasourcePathSpec
PathSpec datasourcePathSpec,
Interval jobInterval
)
throws Exception
{
@ -144,8 +172,8 @@ public class HadoopIngestionSpecUpdateDatasourcePathSpecSegmentsTest
UsedSegmentLister segmentLister = EasyMock.createMock(UsedSegmentLister.class);
EasyMock.expect(
segmentLister.getUsedSegmentsForInterval(testDatasource, testDatasourceInterval)
).andReturn(segments);
segmentLister.getUsedSegmentsForInterval(testDatasource, jobInterval)
).andReturn(ImmutableList.of(SEGMENT));
EasyMock.replay(segmentLister);
spec = HadoopIngestionSpec.updateSegmentListIfDatasourcePathSpecIsUsed(spec, jsonMapper, segmentLister);

View File

@ -40,7 +40,7 @@ import java.util.List;
*/
public class DatasourceInputFormatTest
{
private List<DataSegment> segments;
private List<WindowedDataSegment> segments;
private Configuration config;
private JobContext context;
@ -48,47 +48,53 @@ public class DatasourceInputFormatTest
public void setUp() throws Exception
{
segments = ImmutableList.of(
new DataSegment(
"test1",
Interval.parse("2000/3000"),
"ver",
ImmutableMap.<String, Object>of(
"type", "local",
"path", "/tmp/index1.zip"
),
ImmutableList.of("host"),
ImmutableList.of("visited_sum", "unique_hosts"),
new NoneShardSpec(),
9,
2
WindowedDataSegment.of(
new DataSegment(
"test1",
Interval.parse("2000/3000"),
"ver",
ImmutableMap.<String, Object>of(
"type", "local",
"path", "/tmp/index1.zip"
),
ImmutableList.of("host"),
ImmutableList.of("visited_sum", "unique_hosts"),
new NoneShardSpec(),
9,
2
)
),
new DataSegment(
"test2",
Interval.parse("2050/3000"),
"ver",
ImmutableMap.<String, Object>of(
"type", "hdfs",
"path", "/tmp/index2.zip"
),
ImmutableList.of("host"),
ImmutableList.of("visited_sum", "unique_hosts"),
new NoneShardSpec(),
9,
11
WindowedDataSegment.of(
new DataSegment(
"test2",
Interval.parse("2050/3000"),
"ver",
ImmutableMap.<String, Object>of(
"type", "hdfs",
"path", "/tmp/index2.zip"
),
ImmutableList.of("host"),
ImmutableList.of("visited_sum", "unique_hosts"),
new NoneShardSpec(),
9,
11
)
),
new DataSegment(
"test3",
Interval.parse("2030/3000"),
"ver",
ImmutableMap.<String, Object>of(
"type", "hdfs",
"path", "/tmp/index3.zip"
),
ImmutableList.of("host"),
ImmutableList.of("visited_sum", "unique_hosts"),
new NoneShardSpec(),
9,
4
WindowedDataSegment.of(
new DataSegment(
"test3",
Interval.parse("2030/3000"),
"ver",
ImmutableMap.<String, Object>of(
"type", "hdfs",
"path", "/tmp/index3.zip"
),
ImmutableList.of("host"),
ImmutableList.of("visited_sum", "unique_hosts"),
new NoneShardSpec(),
9,
4
)
)
);

View File

@ -39,21 +39,25 @@ public class DatasourceInputSplitTest
@Test
public void testSerde() throws Exception
{
Interval interval = Interval.parse("2000/3000");
DatasourceInputSplit expected = new DatasourceInputSplit(
Lists.newArrayList(
new DataSegment(
"test",
Interval.parse("2000/3000"),
"ver",
ImmutableMap.<String, Object>of(
"type", "local",
"path", "/tmp/index.zip"
new WindowedDataSegment(
new DataSegment(
"test",
Interval.parse("2000/3000"),
"ver",
ImmutableMap.<String, Object>of(
"type", "local",
"path", "/tmp/index.zip"
),
ImmutableList.of("host"),
ImmutableList.of("visited_sum", "unique_hosts"),
new NoneShardSpec(),
9,
12334
),
ImmutableList.of("host"),
ImmutableList.of("visited_sum", "unique_hosts"),
new NoneShardSpec(),
9,
12334
interval
)
)
);

View File

@ -55,7 +55,7 @@ public class DatasourceRecordReaderTest
this.getClass().getClassLoader().getResource("test-segment/index.zip").getPath()
)
);
InputSplit split = new DatasourceInputSplit(Lists.newArrayList(segment));
InputSplit split = new DatasourceInputSplit(Lists.newArrayList(WindowedDataSegment.of(segment)));
Configuration config = new Configuration();
config.set(

View File

@ -0,0 +1,78 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.indexer.hadoop;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.NoneShardSpec;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Test;
import java.io.IOException;
public class WindowedDataSegmentTest
{
private static final ObjectMapper MAPPER = new DefaultObjectMapper();
private static final DataSegment SEGMENT = new DataSegment(
"test1",
Interval.parse("2000/3000"),
"ver",
ImmutableMap.<String, Object>of(
"type", "local",
"path", "/tmp/index1.zip"
),
ImmutableList.of("host"),
ImmutableList.of("visited_sum", "unique_hosts"),
new NoneShardSpec(),
9,
2
);
@Test
public void testSerdeFullWindow() throws IOException
{
final WindowedDataSegment windowedDataSegment = WindowedDataSegment.of(SEGMENT);
final WindowedDataSegment roundTrip = MAPPER.readValue(
MAPPER.writeValueAsBytes(windowedDataSegment),
WindowedDataSegment.class
);
Assert.assertEquals(windowedDataSegment, roundTrip);
Assert.assertEquals(SEGMENT, roundTrip.getSegment());
Assert.assertEquals(SEGMENT.getInterval(), roundTrip.getInterval());
}
@Test
public void testSerdePartialWindow() throws IOException
{
final Interval partialInterval = new Interval("2500/3000");
final WindowedDataSegment windowedDataSegment = new WindowedDataSegment(SEGMENT, partialInterval);
final WindowedDataSegment roundTrip = MAPPER.readValue(
MAPPER.writeValueAsBytes(windowedDataSegment),
WindowedDataSegment.class
);
Assert.assertEquals(windowedDataSegment, roundTrip);
Assert.assertEquals(SEGMENT, roundTrip.getSegment());
Assert.assertEquals(partialInterval, roundTrip.getInterval());
}
}

View File

@ -42,6 +42,7 @@ import io.druid.indexer.HadoopIngestionSpec;
import io.druid.indexer.HadoopTuningConfig;
import io.druid.indexer.hadoop.DatasourceIngestionSpec;
import io.druid.indexer.hadoop.DatasourceInputFormat;
import io.druid.indexer.hadoop.WindowedDataSegment;
import io.druid.initialization.Initialization;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.query.aggregation.AggregatorFactory;
@ -65,7 +66,7 @@ import java.util.List;
public class DatasourcePathSpecTest
{
private DatasourceIngestionSpec ingestionSpec;
private List<DataSegment> segments;
private List<WindowedDataSegment> segments;
public DatasourcePathSpecTest()
{
@ -79,33 +80,37 @@ public class DatasourcePathSpecTest
);
segments = ImmutableList.of(
new DataSegment(
ingestionSpec.getDataSource(),
Interval.parse("2000/3000"),
"ver",
ImmutableMap.<String, Object>of(
"type", "local",
"path", "/tmp/index.zip"
),
ImmutableList.of("product"),
ImmutableList.of("visited_sum", "unique_hosts"),
new NoneShardSpec(),
9,
12334
WindowedDataSegment.of(
new DataSegment(
ingestionSpec.getDataSource(),
Interval.parse("2000/3000"),
"ver",
ImmutableMap.<String, Object>of(
"type", "local",
"path", "/tmp/index.zip"
),
ImmutableList.of("product"),
ImmutableList.of("visited_sum", "unique_hosts"),
new NoneShardSpec(),
9,
12334
)
),
new DataSegment(
ingestionSpec.getDataSource(),
Interval.parse("2050/3000"),
"ver",
ImmutableMap.<String, Object>of(
"type", "hdfs",
"path", "/tmp/index.zip"
),
ImmutableList.of("product"),
ImmutableList.of("visited_sum", "unique_hosts"),
new NoneShardSpec(),
9,
12335
WindowedDataSegment.of(
new DataSegment(
ingestionSpec.getDataSource(),
Interval.parse("2050/3000"),
"ver",
ImmutableMap.<String, Object>of(
"type", "hdfs",
"path", "/tmp/index.zip"
),
ImmutableList.of("product"),
ImmutableList.of("visited_sum", "unique_hosts"),
new NoneShardSpec(),
9,
12335
)
)
);
}
@ -201,7 +206,6 @@ public class DatasourcePathSpecTest
);
ObjectMapper mapper = new DefaultObjectMapper();
DatasourcePathSpec pathSpec = new DatasourcePathSpec(
@ -217,19 +221,24 @@ public class DatasourcePathSpecTest
EasyMock.replay(job);
pathSpec.addInputPaths(hadoopIndexerConfig, job);
List<DataSegment> actualSegments = mapper.readValue(
List<WindowedDataSegment> actualSegments = mapper.readValue(
config.get(DatasourceInputFormat.CONF_INPUT_SEGMENTS),
new TypeReference<List<DataSegment>>()
new TypeReference<List<WindowedDataSegment>>()
{
}
);
Assert.assertEquals(segments, actualSegments);
DatasourceIngestionSpec actualIngestionSpec = mapper.readValue(config.get(DatasourceInputFormat.CONF_DRUID_SCHEMA), DatasourceIngestionSpec.class);
Assert.assertEquals(ingestionSpec
.withDimensions(ImmutableList.of("product"))
.withMetrics(ImmutableList.of("visited_sum")),
actualIngestionSpec);
DatasourceIngestionSpec actualIngestionSpec = mapper.readValue(
config.get(DatasourceInputFormat.CONF_DRUID_SCHEMA),
DatasourceIngestionSpec.class
);
Assert.assertEquals(
ingestionSpec
.withDimensions(ImmutableList.of("product"))
.withMetrics(ImmutableList.of("visited_sum")),
actualIngestionSpec
);
}
}

View File

@ -41,9 +41,9 @@ import io.druid.indexing.common.task.NoopTask;
import io.druid.query.filter.DimFilter;
import io.druid.segment.IndexIO;
import io.druid.segment.QueryableIndexStorageAdapter;
import io.druid.segment.StorageAdapter;
import io.druid.segment.loading.SegmentLoadingException;
import io.druid.segment.realtime.firehose.IngestSegmentFirehose;
import io.druid.segment.realtime.firehose.WindowedStorageAdapter;
import io.druid.timeline.DataSegment;
import io.druid.timeline.TimelineObjectHolder;
import io.druid.timeline.VersionedIntervalTimeline;
@ -131,7 +131,7 @@ public class IngestSegmentFirehoseFactory implements FirehoseFactory<InputRowPar
.getTaskActionClient()
.submit(new SegmentListUsedAction(dataSource, interval));
final Map<DataSegment, File> segmentFileMap = toolbox.fetchSegments(usedSegments);
VersionedIntervalTimeline<String, DataSegment> timeline = new VersionedIntervalTimeline<String, DataSegment>(
VersionedIntervalTimeline<String, DataSegment> timeline = new VersionedIntervalTimeline<>(
Ordering.<String>natural().nullsFirst()
);
@ -142,7 +142,7 @@ public class IngestSegmentFirehoseFactory implements FirehoseFactory<InputRowPar
interval
);
List<String> dims;
final List<String> dims;
if (dimensions != null) {
dims = dimensions;
} else if (inputRowParser.getParseSpec().getDimensionsSpec().hasCustomDimensions()) {
@ -189,7 +189,7 @@ public class IngestSegmentFirehoseFactory implements FirehoseFactory<InputRowPar
);
}
List<String> metricsList;
final List<String> metricsList;
if (metrics != null) {
metricsList = metrics;
} else {
@ -226,34 +226,35 @@ public class IngestSegmentFirehoseFactory implements FirehoseFactory<InputRowPar
}
final List<StorageAdapter> adapters = Lists.newArrayList(
final List<WindowedStorageAdapter> adapters = Lists.newArrayList(
Iterables.concat(
Iterables.transform(
timeLineSegments,
new Function<TimelineObjectHolder<String, DataSegment>, Iterable<StorageAdapter>>()
new Function<TimelineObjectHolder<String, DataSegment>, Iterable<WindowedStorageAdapter>>()
{
@Override
public Iterable<StorageAdapter> apply(
TimelineObjectHolder<String, DataSegment> input
)
public Iterable<WindowedStorageAdapter> apply(final TimelineObjectHolder<String, DataSegment> holder)
{
return
Iterables.transform(
input.getObject(),
new Function<PartitionChunk<DataSegment>, StorageAdapter>()
holder.getObject(),
new Function<PartitionChunk<DataSegment>, WindowedStorageAdapter>()
{
@Override
public StorageAdapter apply(PartitionChunk<DataSegment> input)
public WindowedStorageAdapter apply(final PartitionChunk<DataSegment> input)
{
final DataSegment segment = input.getObject();
try {
return new QueryableIndexStorageAdapter(
IndexIO.loadIndex(
Preconditions.checkNotNull(
segmentFileMap.get(segment),
"File for segment %s", segment.getIdentifier()
return new WindowedStorageAdapter(
new QueryableIndexStorageAdapter(
IndexIO.loadIndex(
Preconditions.checkNotNull(
segmentFileMap.get(segment),
"File for segment %s", segment.getIdentifier()
)
)
)
),
holder.getInterval()
);
}
catch (IOException e) {
@ -268,8 +269,7 @@ public class IngestSegmentFirehoseFactory implements FirehoseFactory<InputRowPar
)
);
return new IngestSegmentFirehose(adapters, dims, metricsList, dimFilter, interval, QueryGranularity.NONE);
return new IngestSegmentFirehose(adapters, dims, metricsList, dimFilter, QueryGranularity.NONE);
}
catch (IOException e) {
throw Throwables.propagate(e);

View File

@ -179,37 +179,7 @@ public class IngestSegmentFirehoseFactoryTest
ts,
new TaskActionToolbox(tl, mdc, newMockEmitter())
);
final ObjectMapper objectMapper = new DefaultObjectMapper();
objectMapper.registerModule(
new SimpleModule("testModule").registerSubtypes(LocalLoadSpec.class)
);
final GuiceAnnotationIntrospector guiceIntrospector = new GuiceAnnotationIntrospector();
objectMapper.setAnnotationIntrospectors(
new AnnotationIntrospectorPair(
guiceIntrospector, objectMapper.getSerializationConfig().getAnnotationIntrospector()
),
new AnnotationIntrospectorPair(
guiceIntrospector, objectMapper.getDeserializationConfig().getAnnotationIntrospector()
)
);
objectMapper.setInjectableValues(
new GuiceInjectableValues(
GuiceInjectors.makeStartupInjectorWithModules(
ImmutableList.of(
new Module()
{
@Override
public void configure(Binder binder)
{
binder.bind(LocalDataSegmentPuller.class);
}
}
)
)
)
);
final ObjectMapper objectMapper = newObjectMapper();
final TaskToolboxFactory taskToolboxFactory = new TaskToolboxFactory(
new TaskConfig(tmpDir.getAbsolutePath(), null, null, 50000, null),
tac,
@ -332,6 +302,41 @@ public class IngestSegmentFirehoseFactoryTest
return values;
}
public static ObjectMapper newObjectMapper()
{
final ObjectMapper objectMapper = new DefaultObjectMapper();
objectMapper.registerModule(
new SimpleModule("testModule").registerSubtypes(LocalLoadSpec.class)
);
final GuiceAnnotationIntrospector guiceIntrospector = new GuiceAnnotationIntrospector();
objectMapper.setAnnotationIntrospectors(
new AnnotationIntrospectorPair(
guiceIntrospector, objectMapper.getSerializationConfig().getAnnotationIntrospector()
),
new AnnotationIntrospectorPair(
guiceIntrospector, objectMapper.getDeserializationConfig().getAnnotationIntrospector()
)
);
objectMapper.setInjectableValues(
new GuiceInjectableValues(
GuiceInjectors.makeStartupInjectorWithModules(
ImmutableList.of(
new Module()
{
@Override
public void configure(Binder binder)
{
binder.bind(LocalDataSegmentPuller.class);
}
}
)
)
)
);
return objectMapper;
}
public IngestSegmentFirehoseFactoryTest(
IngestSegmentFirehoseFactory factory,
String testName,

View File

@ -0,0 +1,423 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.indexing.firehose;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.api.client.repackaged.com.google.common.base.Throwables;
import com.google.api.client.util.Sets;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.io.Files;
import com.google.inject.Binder;
import com.google.inject.Guice;
import com.google.inject.Injector;
import com.google.inject.Module;
import io.druid.common.utils.JodaUtils;
import io.druid.data.input.Firehose;
import io.druid.data.input.InputRow;
import io.druid.data.input.MapBasedInputRow;
import io.druid.data.input.impl.DimensionsSpec;
import io.druid.data.input.impl.InputRowParser;
import io.druid.data.input.impl.JSONParseSpec;
import io.druid.data.input.impl.MapInputRowParser;
import io.druid.data.input.impl.TimestampSpec;
import io.druid.granularity.QueryGranularity;
import io.druid.indexing.common.SegmentLoaderFactory;
import io.druid.indexing.common.TaskToolboxFactory;
import io.druid.indexing.common.actions.SegmentListUsedAction;
import io.druid.indexing.common.actions.TaskAction;
import io.druid.indexing.common.actions.TaskActionClient;
import io.druid.indexing.common.actions.TaskActionClientFactory;
import io.druid.indexing.common.config.TaskConfig;
import io.druid.indexing.common.task.Task;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.LongSumAggregatorFactory;
import io.druid.query.filter.NoopDimFilter;
import io.druid.segment.IndexMerger;
import io.druid.segment.IndexSpec;
import io.druid.segment.incremental.IncrementalIndexSchema;
import io.druid.segment.incremental.IndexSizeExceededException;
import io.druid.segment.incremental.OnheapIncrementalIndex;
import io.druid.segment.loading.SegmentLoaderConfig;
import io.druid.segment.loading.SegmentLoaderLocalCacheManager;
import io.druid.segment.loading.StorageLocationConfig;
import io.druid.server.metrics.NoopServiceEmitter;
import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.LinearShardSpec;
import org.apache.commons.io.FileUtils;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
@RunWith(Parameterized.class)
public class IngestSegmentFirehoseFactoryTimelineTest
{
private static final String DATA_SOURCE = "foo";
private static final String TIME_COLUMN = "t";
private static final String[] DIMENSIONS = new String[]{"d1"};
private static final String[] METRICS = new String[]{"m1"};
private static final InputRowParser<Map<String, Object>> ROW_PARSER = new MapInputRowParser(
new JSONParseSpec(
new TimestampSpec(TIME_COLUMN, "auto", null),
new DimensionsSpec(
Arrays.asList(DIMENSIONS),
null,
null
)
)
);
private final IngestSegmentFirehoseFactory factory;
private final File tmpDir;
private final int expectedCount;
private final long expectedSum;
public IngestSegmentFirehoseFactoryTimelineTest(
String name,
IngestSegmentFirehoseFactory factory,
File tmpDir,
int expectedCount,
long expectedSum
)
{
this.factory = factory;
this.tmpDir = tmpDir;
this.expectedCount = expectedCount;
this.expectedSum = expectedSum;
}
@Test
public void testSimple() throws Exception
{
int count = 0;
long sum = 0;
try (final Firehose firehose = factory.connect(ROW_PARSER)) {
while (firehose.hasMore()) {
final InputRow row = firehose.nextRow();
count++;
sum += row.getLongMetric(METRICS[0]);
}
}
Assert.assertEquals("count", expectedCount, count);
Assert.assertEquals("sum", expectedSum, sum);
}
@After
public void tearDown() throws Exception
{
FileUtils.deleteDirectory(tmpDir);
}
private static TestCase TC(
String intervalString,
int expectedCount,
long expectedSum,
DataSegmentMaker... segmentMakers
)
{
final File tmpDir = Files.createTempDir();
final Set<DataSegment> segments = Sets.newHashSet();
for (DataSegmentMaker segmentMaker : segmentMakers) {
segments.add(segmentMaker.make(tmpDir));
}
return new TestCase(
tmpDir,
new Interval(intervalString),
expectedCount,
expectedSum,
segments
);
}
private static DataSegmentMaker DS(
String intervalString,
String version,
int partitionNum,
InputRow... rows
)
{
return new DataSegmentMaker(new Interval(intervalString), version, partitionNum, Arrays.asList(rows));
}
private static InputRow IR(String timeString, long metricValue)
{
return new MapBasedInputRow(
new DateTime(timeString).getMillis(),
Arrays.asList(DIMENSIONS),
ImmutableMap.<String, Object>of(
TIME_COLUMN, new DateTime(timeString).toString(),
DIMENSIONS[0], "bar",
METRICS[0], metricValue
)
);
}
private static Map<String, Object> persist(File tmpDir, InputRow... rows)
{
final File persistDir = new File(tmpDir, UUID.randomUUID().toString());
final IncrementalIndexSchema schema = new IncrementalIndexSchema.Builder()
.withQueryGranularity(QueryGranularity.NONE)
.withMinTimestamp(JodaUtils.MIN_INSTANT)
.withDimensionsSpec(ROW_PARSER)
.withMetrics(
new AggregatorFactory[]{
new LongSumAggregatorFactory(METRICS[0], METRICS[0])
}
)
.build();
final OnheapIncrementalIndex index = new OnheapIncrementalIndex(schema, rows.length);
for (InputRow row : rows) {
try {
index.add(row);
}
catch (IndexSizeExceededException e) {
throw Throwables.propagate(e);
}
}
try {
IndexMerger.persist(index, persistDir, null, new IndexSpec());
}
catch (IOException e) {
throw Throwables.propagate(e);
}
return ImmutableMap.<String, Object>of(
"type", "local",
"path", persistDir.getAbsolutePath()
);
}
@Parameterized.Parameters(name = "{0}")
public static Collection<Object[]> constructorFeeder()
{
final List<TestCase> testCases = ImmutableList.of(
TC(
"2000/2000T02", 3, 7,
DS("2000/2000T01", "v1", 0, IR("2000", 1), IR("2000T00:01", 2)),
DS("2000T01/2000T02", "v1", 0, IR("2000T01", 4))
) /* Adjacent segments */,
TC(
"2000/2000T02", 3, 7,
DS("2000/2000T02", "v1", 0, IR("2000", 1), IR("2000T00:01", 2), IR("2000T01", 8)),
DS("2000T01/2000T02", "v2", 0, IR("2000T01:01", 4))
) /* 1H segment overlayed on top of 2H segment */,
TC(
"2000/2000T02", 4, 15,
DS("2000/2000T02", "v1", 0, IR("2000", 1), IR("2000T00:01", 2), IR("2000T01", 8)),
DS("2000/2000T02", "v1", 1, IR("2000T01:01", 4))
) /* Segment set with two segments for the same interval */,
TC(
"2000T01/2000T02", 1, 2,
DS("2000/2000T03", "v1", 0, IR("2000", 1), IR("2000T01", 2), IR("2000T02", 4))
) /* Segment wider than desired interval */,
TC(
"2000T02/2000T04", 2, 12,
DS("2000/2000T03", "v1", 0, IR("2000", 1), IR("2000T01", 2), IR("2000T02", 4)),
DS("2000T03/2000T04", "v1", 0, IR("2000T03", 8))
) /* Segment intersecting desired interval */
);
final List<Object[]> constructors = Lists.newArrayList();
for (final TestCase testCase : testCases) {
final ObjectMapper objectMapper = IngestSegmentFirehoseFactoryTest.newObjectMapper();
final TaskActionClient taskActionClient = new TaskActionClient()
{
@Override
public <RetType> RetType submit(TaskAction<RetType> taskAction) throws IOException
{
if (taskAction instanceof SegmentListUsedAction) {
// Expect the interval we asked for
final SegmentListUsedAction action = (SegmentListUsedAction) taskAction;
if (action.getInterval().equals(testCase.interval)) {
return (RetType) ImmutableList.copyOf(testCase.segments);
} else {
throw new IllegalArgumentException("WTF");
}
} else {
throw new UnsupportedOperationException();
}
}
};
final TaskToolboxFactory taskToolboxFactory = new TaskToolboxFactory(
new TaskConfig(testCase.tmpDir.getAbsolutePath(), null, null, 50000, null),
new TaskActionClientFactory()
{
@Override
public TaskActionClient create(Task task)
{
return taskActionClient;
}
},
new NoopServiceEmitter(),
null, // segment pusher
null, // segment killer
null, // segment mover
null, // segment archiver
null, // segment announcer
null, // new segment server view
null, // query runner factory conglomerate corporation unionized collective
null, // query executor service
null, // monitor scheduler
new SegmentLoaderFactory(
new SegmentLoaderLocalCacheManager(
null,
new SegmentLoaderConfig()
{
@Override
public List<StorageLocationConfig> getLocations()
{
return Lists.newArrayList();
}
}, objectMapper
)
),
objectMapper
);
final Injector injector = Guice.createInjector(
new Module()
{
@Override
public void configure(Binder binder)
{
binder.bind(TaskToolboxFactory.class).toInstance(taskToolboxFactory);
}
}
);
final IngestSegmentFirehoseFactory factory = new IngestSegmentFirehoseFactory(
DATA_SOURCE,
testCase.interval,
new NoopDimFilter(),
Arrays.asList(DIMENSIONS),
Arrays.asList(METRICS),
injector
);
constructors.add(
new Object[]{
testCase.toString(),
factory,
testCase.tmpDir,
testCase.expectedCount,
testCase.expectedSum
}
);
}
return constructors;
}
private static class TestCase
{
final File tmpDir;
final Interval interval;
final int expectedCount;
final long expectedSum;
final Set<DataSegment> segments;
public TestCase(
File tmpDir,
Interval interval,
int expectedCount,
long expectedSum,
Set<DataSegment> segments
)
{
this.tmpDir = tmpDir;
this.interval = interval;
this.expectedCount = expectedCount;
this.expectedSum = expectedSum;
this.segments = segments;
}
@Override
public String toString()
{
final List<String> segmentIdentifiers = Lists.newArrayList();
for (DataSegment segment : segments) {
segmentIdentifiers.add(segment.getIdentifier());
}
return "TestCase{" +
"interval=" + interval +
", expectedCount=" + expectedCount +
", expectedSum=" + expectedSum +
", segments=" + segmentIdentifiers +
'}';
}
}
private static class DataSegmentMaker
{
final Interval interval;
final String version;
final int partitionNum;
final List<InputRow> rows;
public DataSegmentMaker(
Interval interval,
String version,
int partitionNum,
List<InputRow> rows
)
{
this.interval = interval;
this.version = version;
this.partitionNum = partitionNum;
this.rows = rows;
}
public DataSegment make(File tmpDir)
{
final Map<String, Object> loadSpec = persist(tmpDir, Iterables.toArray(rows, InputRow.class));
return new DataSegment(
DATA_SOURCE,
interval,
version,
loadSpec,
Arrays.asList(DIMENSIONS),
Arrays.asList(METRICS),
new LinearShardSpec(partitionNum),
-1,
0L
);
}
}
}

View File

@ -37,13 +37,11 @@ import io.druid.segment.Cursor;
import io.druid.segment.DimensionSelector;
import io.druid.segment.LongColumnSelector;
import io.druid.segment.ObjectColumnSelector;
import io.druid.segment.StorageAdapter;
import io.druid.segment.column.Column;
import io.druid.segment.data.IndexedInts;
import io.druid.segment.filter.Filters;
import io.druid.utils.Runnables;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.io.IOException;
@ -55,21 +53,27 @@ public class IngestSegmentFirehose implements Firehose
{
private volatile Yielder<InputRow> rowYielder;
public IngestSegmentFirehose(List<StorageAdapter> adapters, final List<String> dims, final List<String> metrics, final DimFilter dimFilter, final Interval interval, final QueryGranularity granularity)
public IngestSegmentFirehose(
final List<WindowedStorageAdapter> adapters,
final List<String> dims,
final List<String> metrics,
final DimFilter dimFilter,
final QueryGranularity granularity
)
{
Sequence<InputRow> rows = Sequences.concat(
Iterables.transform(
adapters, new Function<StorageAdapter, Sequence<InputRow>>()
adapters, new Function<WindowedStorageAdapter, Sequence<InputRow>>()
{
@Nullable
@Override
public Sequence<InputRow> apply(StorageAdapter adapter)
public Sequence<InputRow> apply(WindowedStorageAdapter adapter)
{
return Sequences.concat(
Sequences.map(
adapter.makeCursors(
adapter.getAdapter().makeCursors(
Filters.convertDimensionFilters(dimFilter),
interval,
adapter.getInterval(),
granularity
), new Function<Cursor, Sequence<InputRow>>()
{
@ -199,4 +203,5 @@ public class IngestSegmentFirehose implements Firehose
{
rowYielder.close();
}
}

View File

@ -0,0 +1,54 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.segment.realtime.firehose;
import io.druid.segment.StorageAdapter;
import org.joda.time.Interval;
public class WindowedStorageAdapter
{
private final StorageAdapter adapter;
private final Interval interval;
public WindowedStorageAdapter(StorageAdapter adapter, Interval interval)
{
this.adapter = adapter;
this.interval = interval;
}
public StorageAdapter getAdapter()
{
return adapter;
}
public Interval getInterval()
{
return interval;
}
@Override
public String toString()
{
return "WindowedStorageAdapter{" +
"adapter=" + adapter +
", interval=" + interval +
'}';
}
}