Remove timeline chunk count assumptions.

* Replace with generic iterables
This commit is contained in:
msprunck 2015-03-10 22:04:47 +01:00
parent 9fd14cad4f
commit 942c17a2aa
6 changed files with 746 additions and 176 deletions

View File

@ -22,6 +22,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Function; import com.google.common.base.Function;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.base.Predicate; import com.google.common.base.Predicate;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Ordering; import com.google.common.collect.Ordering;
import io.druid.segment.IndexIO; import io.druid.segment.IndexIO;
@ -33,8 +34,10 @@ import io.druid.segment.RowboatFilteringIndexAdapter;
import io.druid.timeline.DataSegment; import io.druid.timeline.DataSegment;
import io.druid.timeline.TimelineObjectHolder; import io.druid.timeline.TimelineObjectHolder;
import io.druid.timeline.VersionedIntervalTimeline; import io.druid.timeline.VersionedIntervalTimeline;
import io.druid.timeline.partition.PartitionChunk;
import org.joda.time.Interval; import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.io.File; import java.io.File;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -65,22 +68,36 @@ public class AppendTask extends MergeTaskBase
timeline.add(segment.getInterval(), segment.getVersion(), segment.getShardSpec().createChunk(segment)); timeline.add(segment.getInterval(), segment.getVersion(), segment.getShardSpec().createChunk(segment));
} }
final List<SegmentToMergeHolder> segmentsToMerge = Lists.transform( final Iterable<SegmentToMergeHolder> segmentsToMerge = Iterables.concat(
timeline.lookup(new Interval("1000-01-01/3000-01-01")), Iterables.transform(
new Function<TimelineObjectHolder<String, DataSegment>, SegmentToMergeHolder>() timeline.lookup(new Interval("1000-01-01/3000-01-01")),
{ new Function<TimelineObjectHolder<String, DataSegment>, Iterable<SegmentToMergeHolder>>()
@Override {
public SegmentToMergeHolder apply(TimelineObjectHolder<String, DataSegment> input) @Override
{ public Iterable<SegmentToMergeHolder> apply(final TimelineObjectHolder<String, DataSegment> input)
final DataSegment segment = input.getObject().getChunk(0).getObject(); {
final File file = Preconditions.checkNotNull( return Iterables.transform(
segments.get(segment), input.getObject(),
"File for segment %s", segment.getIdentifier() new Function<PartitionChunk<DataSegment>, SegmentToMergeHolder>()
); {
@Nullable
return new SegmentToMergeHolder(segment, input.getInterval(), file); @Override
} public SegmentToMergeHolder apply(PartitionChunk<DataSegment> chunkInput)
} {
DataSegment segment = chunkInput.getObject();
return new SegmentToMergeHolder(
segment, input.getInterval(),
Preconditions.checkNotNull(
segments.get(segment),
"File for segment %s", segment.getIdentifier()
)
);
}
}
);
}
}
)
); );
List<IndexableAdapter> adapters = Lists.newArrayList(); List<IndexableAdapter> adapters = Lists.newArrayList();

View File

@ -29,6 +29,7 @@ import com.google.common.collect.Maps;
import com.google.common.collect.Ordering; import com.google.common.collect.Ordering;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import com.google.inject.Injector; import com.google.inject.Injector;
import com.google.inject.Provider;
import com.metamx.common.guava.Sequence; import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences; import com.metamx.common.guava.Sequences;
import com.metamx.common.guava.Yielder; import com.metamx.common.guava.Yielder;
@ -61,6 +62,7 @@ import io.druid.segment.loading.SegmentLoadingException;
import io.druid.timeline.DataSegment; import io.druid.timeline.DataSegment;
import io.druid.timeline.TimelineObjectHolder; import io.druid.timeline.TimelineObjectHolder;
import io.druid.timeline.VersionedIntervalTimeline; import io.druid.timeline.VersionedIntervalTimeline;
import io.druid.timeline.partition.PartitionChunk;
import io.druid.utils.Runnables; import io.druid.utils.Runnables;
import org.joda.time.DateTime; import org.joda.time.DateTime;
import org.joda.time.Interval; import org.joda.time.Interval;
@ -68,7 +70,6 @@ import org.joda.time.Interval;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.util.HashSet;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -166,15 +167,43 @@ public class IngestSegmentFirehoseFactory implements FirehoseFactory<InputRowPar
} else if (inputRowParser.getParseSpec().getDimensionsSpec().hasCustomDimensions()) { } else if (inputRowParser.getParseSpec().getDimensionsSpec().hasCustomDimensions()) {
dims = inputRowParser.getParseSpec().getDimensionsSpec().getDimensions(); dims = inputRowParser.getParseSpec().getDimensionsSpec().getDimensions();
} else { } else {
Set<String> dimSet = new HashSet<>(); Set<String> dimSet = Sets.newHashSet(
for (TimelineObjectHolder<String, DataSegment> timelineObjectHolder : timeLineSegments) { Iterables.concat(
dimSet.addAll(timelineObjectHolder.getObject().getChunk(0).getObject().getDimensions()); Iterables.transform(
} timeLineSegments,
new Function<TimelineObjectHolder<String, DataSegment>, Iterable<String>>()
{
@Override
public Iterable<String> apply(
TimelineObjectHolder<String, DataSegment> timelineObjectHolder
)
{
return Iterables.concat(
Iterables.transform(
timelineObjectHolder.getObject(),
new Function<PartitionChunk<DataSegment>, Iterable<String>>()
{
@Override
public Iterable<String> apply(PartitionChunk<DataSegment> input)
{
return input.getObject().getDimensions();
}
}
)
);
}
}
)
)
);
dims = Lists.newArrayList( dims = Lists.newArrayList(
Sets.difference( Sets.difference(
dimSet, dimSet,
inputRowParser.getParseSpec().getDimensionsSpec() inputRowParser
.getDimensionExclusions() .getParseSpec()
.getDimensionsSpec()
.getDimensionExclusions()
) )
); );
} }
@ -183,35 +212,79 @@ public class IngestSegmentFirehoseFactory implements FirehoseFactory<InputRowPar
if (metrics != null) { if (metrics != null) {
metricsList = metrics; metricsList = metrics;
} else { } else {
Set<String> metricsSet = new HashSet<>(); Set<String> metricsSet = Sets.newHashSet(
for (TimelineObjectHolder<String, DataSegment> timelineObjectHolder : timeLineSegments) { Iterables.concat(
metricsSet.addAll(timelineObjectHolder.getObject().getChunk(0).getObject().getMetrics()); Iterables.transform(
} timeLineSegments,
new Function<TimelineObjectHolder<String, DataSegment>, Iterable<String>>()
{
@Override
public Iterable<String> apply(
TimelineObjectHolder<String, DataSegment> input
)
{
return Iterables.concat(
Iterables.transform(
input.getObject(),
new Function<PartitionChunk<DataSegment>, Iterable<String>>()
{
@Override
public Iterable<String> apply(PartitionChunk<DataSegment> input)
{
return input.getObject().getMetrics();
}
}
)
);
}
}
)
)
);
metricsList = Lists.newArrayList(metricsSet); metricsList = Lists.newArrayList(metricsSet);
} }
final List<StorageAdapter> adapters = Lists.transform( final List<StorageAdapter> adapters = Lists.newArrayList(
timeLineSegments, Iterables.concat(
new Function<TimelineObjectHolder<String, DataSegment>, StorageAdapter>() Iterables.transform(
{ timeLineSegments,
@Override new Function<TimelineObjectHolder<String, DataSegment>, Iterable<StorageAdapter>>()
public StorageAdapter apply(TimelineObjectHolder<String, DataSegment> input) {
{ @Override
final DataSegment segment = input.getObject().getChunk(0).getObject(); public Iterable<StorageAdapter> apply(
final File file = Preconditions.checkNotNull( TimelineObjectHolder<String, DataSegment> input
segmentFileMap.get(segment), )
"File for segment %s", segment.getIdentifier() {
); return
Iterables.transform(
try { input.getObject(),
return new QueryableIndexStorageAdapter((IndexIO.loadIndex(file))); new Function<PartitionChunk<DataSegment>, StorageAdapter>()
} {
catch (IOException e) { @Override
throw Throwables.propagate(e); public StorageAdapter apply(PartitionChunk<DataSegment> input)
} {
} final DataSegment segment = input.getObject();
} try {
return new QueryableIndexStorageAdapter(
IndexIO.loadIndex(
Preconditions.checkNotNull(
segmentFileMap.get(segment),
"File for segment %s", segment.getIdentifier()
)
)
);
}
catch (IOException e) {
throw Throwables.propagate(e);
}
}
}
);
}
}
)
)
); );
return new IngestSegmentFirehose(adapters, dims, metricsList); return new IngestSegmentFirehose(adapters, dims, metricsList);
@ -235,112 +308,112 @@ public class IngestSegmentFirehoseFactory implements FirehoseFactory<InputRowPar
Sequence<InputRow> rows = Sequences.concat( Sequence<InputRow> rows = Sequences.concat(
Iterables.transform( Iterables.transform(
adapters, new Function<StorageAdapter, Sequence<InputRow>>() adapters, new Function<StorageAdapter, Sequence<InputRow>>()
{ {
@Nullable @Nullable
@Override @Override
public Sequence<InputRow> apply(StorageAdapter adapter) public Sequence<InputRow> apply(StorageAdapter adapter)
{ {
return Sequences.concat( return Sequences.concat(
Sequences.map( Sequences.map(
adapter.makeCursors( adapter.makeCursors(
Filters.convertDimensionFilters(dimFilter), Filters.convertDimensionFilters(dimFilter),
interval, interval,
QueryGranularity.ALL QueryGranularity.ALL
), new Function<Cursor, Sequence<InputRow>>() ), new Function<Cursor, Sequence<InputRow>>()
{
@Nullable
@Override
public Sequence<InputRow> apply(final Cursor cursor)
{
final LongColumnSelector timestampColumnSelector = cursor.makeLongColumnSelector(Column.TIME_COLUMN_NAME);
final Map<String, DimensionSelector> dimSelectors = Maps.newHashMap();
for (String dim : dims) {
final DimensionSelector dimSelector = cursor.makeDimensionSelector(dim, null);
// dimSelector is null if the dimension is not present
if (dimSelector != null) {
dimSelectors.put(dim, dimSelector);
}
}
final Map<String, ObjectColumnSelector> metSelectors = Maps.newHashMap();
for (String metric : metrics) {
final ObjectColumnSelector metricSelector = cursor.makeObjectColumnSelector(metric);
if (metricSelector != null) {
metSelectors.put(metric, metricSelector);
}
}
return Sequences.simple(
new Iterable<InputRow>()
{ {
@Nullable
@Override @Override
public Iterator<InputRow> iterator() public Sequence<InputRow> apply(final Cursor cursor)
{ {
return new Iterator<InputRow>() final LongColumnSelector timestampColumnSelector = cursor.makeLongColumnSelector(Column.TIME_COLUMN_NAME);
{
@Override final Map<String, DimensionSelector> dimSelectors = Maps.newHashMap();
public boolean hasNext() for (String dim : dims) {
{ final DimensionSelector dimSelector = cursor.makeDimensionSelector(dim, null);
return !cursor.isDone(); // dimSelector is null if the dimension is not present
if (dimSelector != null) {
dimSelectors.put(dim, dimSelector);
} }
}
@Override final Map<String, ObjectColumnSelector> metSelectors = Maps.newHashMap();
public InputRow next() for (String metric : metrics) {
{ final ObjectColumnSelector metricSelector = cursor.makeObjectColumnSelector(metric);
final Map<String, Object> theEvent = Maps.newLinkedHashMap(); if (metricSelector != null) {
final long timestamp = timestampColumnSelector.get(); metSelectors.put(metric, metricSelector);
theEvent.put(EventHolder.timestampKey, new DateTime(timestamp)); }
}
for (Map.Entry<String, DimensionSelector> dimSelector : dimSelectors.entrySet()) { return Sequences.simple(
final String dim = dimSelector.getKey(); new Iterable<InputRow>()
final DimensionSelector selector = dimSelector.getValue(); {
final IndexedInts vals = selector.getRow(); @Override
public Iterator<InputRow> iterator()
{
return new Iterator<InputRow>()
{
@Override
public boolean hasNext()
{
return !cursor.isDone();
}
if (vals.size() == 1) { @Override
final String dimVal = selector.lookupName(vals.get(0)); public InputRow next()
theEvent.put(dim, dimVal); {
} else { final Map<String, Object> theEvent = Maps.newLinkedHashMap();
List<String> dimVals = Lists.newArrayList(); final long timestamp = timestampColumnSelector.get();
for (int i = 0; i < vals.size(); ++i) { theEvent.put(EventHolder.timestampKey, new DateTime(timestamp));
dimVals.add(selector.lookupName(vals.get(i)));
} for (Map.Entry<String, DimensionSelector> dimSelector : dimSelectors.entrySet()) {
theEvent.put(dim, dimVals); final String dim = dimSelector.getKey();
final DimensionSelector selector = dimSelector.getValue();
final IndexedInts vals = selector.getRow();
if (vals.size() == 1) {
final String dimVal = selector.lookupName(vals.get(0));
theEvent.put(dim, dimVal);
} else {
List<String> dimVals = Lists.newArrayList();
for (int i = 0; i < vals.size(); ++i) {
dimVals.add(selector.lookupName(vals.get(i)));
}
theEvent.put(dim, dimVals);
}
}
for (Map.Entry<String, ObjectColumnSelector> metSelector : metSelectors.entrySet()) {
final String metric = metSelector.getKey();
final ObjectColumnSelector selector = metSelector.getValue();
theEvent.put(metric, selector.get());
}
cursor.advance();
return new MapBasedInputRow(timestamp, dims, theEvent);
}
@Override
public void remove()
{
throw new UnsupportedOperationException("Remove Not Supported");
}
};
} }
} }
);
for (Map.Entry<String, ObjectColumnSelector> metSelector : metSelectors.entrySet()) {
final String metric = metSelector.getKey();
final ObjectColumnSelector selector = metSelector.getValue();
theEvent.put(metric, selector.get());
}
cursor.advance();
return new MapBasedInputRow(timestamp, dims, theEvent);
}
@Override
public void remove()
{
throw new UnsupportedOperationException("Remove Not Supported");
}
};
} }
} }
); )
} );
} }
) }
);
}
}
) )
); );
rowYielder = rows.toYielder( rowYielder = rows.toYielder(
null, null,
new YieldingAccumulator() new YieldingAccumulator<InputRow, InputRow>()
{ {
@Override @Override
public Object accumulate(Object accumulated, Object in) public InputRow accumulate(InputRow accumulated, InputRow in)
{ {
yield(); yield();
return in; return in;

View File

@ -0,0 +1,460 @@
/*
* Druid - a distributed column store.
* Copyright 2012 - 2015 Metamarkets Group Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.druid.indexing.firehose;
import com.google.api.client.repackaged.com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.io.Files;
import com.google.inject.Binder;
import com.google.inject.Guice;
import com.google.inject.Module;
import com.metamx.common.logger.Logger;
import com.metamx.emitter.core.Event;
import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.emitter.service.ServiceEventBuilder;
import io.druid.common.utils.JodaUtils;
import io.druid.data.input.InputRow;
import io.druid.data.input.impl.DimensionsSpec;
import io.druid.data.input.impl.InputRowParser;
import io.druid.data.input.impl.JSONParseSpec;
import io.druid.data.input.impl.MapInputRowParser;
import io.druid.data.input.impl.SpatialDimensionSchema;
import io.druid.data.input.impl.TimestampSpec;
import io.druid.granularity.QueryGranularity;
import io.druid.indexing.common.SegmentLoaderFactory;
import io.druid.indexing.common.TaskToolboxFactory;
import io.druid.indexing.common.actions.LocalTaskActionClientFactory;
import io.druid.indexing.common.actions.TaskActionToolbox;
import io.druid.indexing.common.config.TaskConfig;
import io.druid.indexing.common.config.TaskStorageConfig;
import io.druid.indexing.overlord.HeapMemoryTaskStorage;
import io.druid.indexing.overlord.TaskLockbox;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.metadata.IndexerSQLMetadataStorageCoordinator;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.DoubleSumAggregatorFactory;
import io.druid.query.aggregation.LongSumAggregatorFactory;
import io.druid.query.filter.SelectorDimFilter;
import io.druid.segment.IndexMerger;
import io.druid.segment.incremental.IncrementalIndexSchema;
import io.druid.segment.incremental.OnheapIncrementalIndex;
import io.druid.segment.loading.DataSegmentArchiver;
import io.druid.segment.loading.DataSegmentKiller;
import io.druid.segment.loading.DataSegmentMover;
import io.druid.segment.loading.DataSegmentPuller;
import io.druid.segment.loading.DataSegmentPusher;
import io.druid.segment.loading.LocalDataSegmentPuller;
import io.druid.segment.loading.OmniSegmentLoader;
import io.druid.segment.loading.SegmentLoaderConfig;
import io.druid.segment.loading.SegmentLoadingException;
import io.druid.segment.loading.StorageLocationConfig;
import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.NumberedShardSpec;
import org.joda.time.Interval;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.io.File;
import java.io.IOException;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
*
*/
@RunWith(Parameterized.class)
public class IngestSegmentFirehoseFactoryTest
{
@Parameterized.Parameters(name = "{1}")
public static Collection<Object[]> constructorFeeder() throws IOException
{
final HeapMemoryTaskStorage ts = new HeapMemoryTaskStorage(
new TaskStorageConfig(null)
{
}
);
final IncrementalIndexSchema schema = new IncrementalIndexSchema.Builder()
.withQueryGranularity(QueryGranularity.NONE)
.withMinTimestamp(JodaUtils.MIN_INSTANT)
.withDimensionsSpec(ROW_PARSER)
.withMetrics(
new AggregatorFactory[]{
new LongSumAggregatorFactory(METRIC_LONG_NAME, DIM_LONG_NAME),
new DoubleSumAggregatorFactory(METRIC_FLOAT_NAME, DIM_FLOAT_NAME)
}
)
.build();
final OnheapIncrementalIndex index = new OnheapIncrementalIndex(
schema,
MAX_ROWS * MAX_SHARD_NUMBER
);
for (Integer i = 0; i < MAX_ROWS; ++i) {
index.add(ROW_PARSER.parse(buildRow(i.longValue())));
}
if (!persistDir.mkdirs() && !persistDir.exists()) {
throw new IOException(String.format("Could not create directory at [%s]", persistDir.getAbsolutePath()));
}
IndexMerger.persist(index, persistDir);
final TaskLockbox tl = new TaskLockbox(ts);
final IndexerSQLMetadataStorageCoordinator mdc = new IndexerSQLMetadataStorageCoordinator(null, null, null)
{
final private Set<DataSegment> published = Sets.newHashSet();
final private Set<DataSegment> nuked = Sets.newHashSet();
@Override
public List<DataSegment> getUsedSegmentsForInterval(String dataSource, Interval interval) throws IOException
{
return ImmutableList.copyOf(segmentSet);
}
@Override
public List<DataSegment> getUnusedSegmentsForInterval(String dataSource, Interval interval)
{
return ImmutableList.of();
}
@Override
public Set<DataSegment> announceHistoricalSegments(Set<DataSegment> segments)
{
Set<DataSegment> added = Sets.newHashSet();
for (final DataSegment segment : segments) {
if (published.add(segment)) {
added.add(segment);
}
}
return ImmutableSet.copyOf(added);
}
@Override
public void deleteSegments(Set<DataSegment> segments)
{
nuked.addAll(segments);
}
};
final LocalTaskActionClientFactory tac = new LocalTaskActionClientFactory(
ts,
new TaskActionToolbox(tl, mdc, newMockEmitter())
);
final TaskToolboxFactory taskToolboxFactory = new TaskToolboxFactory(
new TaskConfig(tmpDir.getAbsolutePath(), null, null, 50000, null),
tac,
newMockEmitter(),
new DataSegmentPusher()
{
@Override
public String getPathForHadoop(String dataSource)
{
throw new UnsupportedOperationException();
}
@Override
public DataSegment push(File file, DataSegment segment) throws IOException
{
return segment;
}
},
new DataSegmentKiller()
{
@Override
public void kill(DataSegment segments) throws SegmentLoadingException
{
}
},
new DataSegmentMover()
{
@Override
public DataSegment move(DataSegment dataSegment, Map<String, Object> targetLoadSpec)
throws SegmentLoadingException
{
return dataSegment;
}
},
new DataSegmentArchiver()
{
@Override
public DataSegment archive(DataSegment segment) throws SegmentLoadingException
{
return segment;
}
@Override
public DataSegment restore(DataSegment segment) throws SegmentLoadingException
{
return segment;
}
},
null, // segment announcer
null, // new segment server view
null, // query runner factory conglomerate corporation unionized collective
null, // query executor service
null, // monitor scheduler
new SegmentLoaderFactory(
new OmniSegmentLoader(
ImmutableMap.<String, DataSegmentPuller>of(
"local",
new LocalDataSegmentPuller()
),
null,
new SegmentLoaderConfig()
{
@Override
public List<StorageLocationConfig> getLocations()
{
return Lists.newArrayList();
}
}
)
),
new DefaultObjectMapper()
);
Collection<Object[]> values = new LinkedList<>();
for (InputRowParser parser : Arrays.<InputRowParser>asList(
ROW_PARSER,
new MapInputRowParser(
new JSONParseSpec(
new TimestampSpec(TIME_COLUMN, "auto"),
new DimensionsSpec(
ImmutableList.<String>of(),
ImmutableList.of(DIM_FLOAT_NAME, DIM_LONG_NAME),
ImmutableList.<SpatialDimensionSchema>of()
)
)
)
)) {
for (List<String> dim_names : Arrays.<List<String>>asList(null, ImmutableList.of(DIM_NAME))) {
for (List<String> metric_names : Arrays.<List<String>>asList(
null,
ImmutableList.of(METRIC_LONG_NAME, METRIC_FLOAT_NAME)
)) {
values.add(
new Object[]{
new IngestSegmentFirehoseFactory(
DATA_SOURCE_NAME,
FOREVER,
new SelectorDimFilter(DIM_NAME, DIM_VALUE),
dim_names,
metric_names,
Guice.createInjector(
new Module()
{
@Override
public void configure(Binder binder)
{
binder.bind(TaskToolboxFactory.class).toInstance(taskToolboxFactory);
}
}
)
),
String.format(
"DimNames[%s]MetricNames[%s]ParserDimNames[%s]",
dim_names == null ? "null" : "dims",
metric_names == null ? "null" : "metrics",
parser == ROW_PARSER ? "dims" : "null"
),
parser
}
);
}
}
}
return values;
}
public IngestSegmentFirehoseFactoryTest(
IngestSegmentFirehoseFactory factory,
String testName,
InputRowParser rowParser
)
{
this.factory = factory;
this.rowParser = rowParser;
}
private static final Logger log = new Logger(IngestSegmentFirehoseFactoryTest.class);
private static final Interval FOREVER = new Interval(JodaUtils.MIN_INSTANT, JodaUtils.MAX_INSTANT);
private static final String DATA_SOURCE_NAME = "testDataSource";
private static final String DATA_SOURCE_VERSION = "version";
private static final Integer BINARY_VERSION = -1;
private static final String DIM_NAME = "testDimName";
private static final String DIM_VALUE = "testDimValue";
private static final String DIM_LONG_NAME = "testDimLongName";
private static final String DIM_FLOAT_NAME = "testDimFloatName";
private static final String METRIC_LONG_NAME = "testLongMetric";
private static final String METRIC_FLOAT_NAME = "testFloatMetric";
private static final Long METRIC_LONG_VALUE = 1l;
private static final Float METRIC_FLOAT_VALUE = 1.0f;
private static final String TIME_COLUMN = "ts";
private static final Integer MAX_SHARD_NUMBER = 10;
private static final Integer MAX_ROWS = 10;
private static final File tmpDir = Files.createTempDir();
private static final File persistDir = Paths.get(tmpDir.getAbsolutePath(), "indexTestMerger").toFile();
private static final List<DataSegment> segmentSet = new ArrayList<>(MAX_SHARD_NUMBER);
private final IngestSegmentFirehoseFactory factory;
private final InputRowParser rowParser;
private static final InputRowParser<Map<String, Object>> ROW_PARSER = new MapInputRowParser(
new JSONParseSpec(
new TimestampSpec(TIME_COLUMN, "auto"),
new DimensionsSpec(
ImmutableList.of(DIM_NAME),
ImmutableList.of(DIM_FLOAT_NAME, DIM_LONG_NAME),
ImmutableList.<SpatialDimensionSchema>of()
)
)
);
private static Map<String, Object> buildRow(Long ts)
{
return ImmutableMap.<String, Object>of(
TIME_COLUMN, ts,
DIM_NAME, DIM_VALUE,
DIM_FLOAT_NAME, METRIC_FLOAT_VALUE,
DIM_LONG_NAME, METRIC_LONG_VALUE
);
}
private static DataSegment buildSegment(Integer shardNumber)
{
Preconditions.checkArgument(shardNumber < MAX_SHARD_NUMBER);
Preconditions.checkArgument(shardNumber >= 0);
return new DataSegment(
DATA_SOURCE_NAME,
FOREVER,
DATA_SOURCE_VERSION,
ImmutableMap.<String, Object>of(
"type", "local",
"path", persistDir.getAbsolutePath()
),
ImmutableList.of(DIM_NAME),
ImmutableList.of(METRIC_LONG_NAME, METRIC_FLOAT_NAME),
new NumberedShardSpec(
shardNumber,
MAX_SHARD_NUMBER
),
BINARY_VERSION,
0l
);
}
@BeforeClass
public static void setUpStatic() throws IOException
{
for (int i = 0; i < MAX_SHARD_NUMBER; ++i) {
segmentSet.add(buildSegment(i));
}
}
@AfterClass
public static void tearDownStatic()
{
recursivelyDelete(tmpDir);
}
private static void recursivelyDelete(final File dir)
{
if (dir != null) {
if (dir.isDirectory()) {
final File[] files = dir.listFiles();
if (files != null) {
for (File file : files) {
recursivelyDelete(file);
}
}
} else {
if (!dir.delete()) {
log.warn("Could not delete file at [%s]", dir.getAbsolutePath());
}
}
}
}
@Test
public void sanityTest()
{
Assert.assertEquals(DATA_SOURCE_NAME, factory.getDataSource());
if (factory.getDimensions() != null) {
Assert.assertArrayEquals(new String[]{DIM_NAME}, factory.getDimensions().toArray());
}
Assert.assertEquals(FOREVER, factory.getInterval());
if (factory.getMetrics() != null) {
Assert.assertEquals(
ImmutableSet.of(METRIC_LONG_NAME, METRIC_FLOAT_NAME),
ImmutableSet.copyOf(factory.getMetrics())
);
}
}
@Test
public void simpleFirehoseReadingTest() throws IOException
{
Assert.assertEquals(MAX_SHARD_NUMBER.longValue(), segmentSet.size());
Integer rowcount = 0;
try (final IngestSegmentFirehoseFactory.IngestSegmentFirehose firehose =
(IngestSegmentFirehoseFactory.IngestSegmentFirehose)
factory.connect(rowParser)) {
while (firehose.hasMore()) {
InputRow row = firehose.nextRow();
Assert.assertArrayEquals(new String[]{DIM_NAME}, row.getDimensions().toArray());
Assert.assertArrayEquals(new String[]{DIM_VALUE}, row.getDimension(DIM_NAME).toArray());
Assert.assertEquals(METRIC_LONG_VALUE.longValue(), row.getLongMetric(METRIC_LONG_NAME));
Assert.assertEquals(METRIC_FLOAT_VALUE, row.getFloatMetric(METRIC_FLOAT_NAME), METRIC_FLOAT_VALUE * 0.0001);
++rowcount;
}
}
Assert.assertEquals((int) MAX_SHARD_NUMBER * MAX_ROWS, (int) rowcount);
}
private static ServiceEmitter newMockEmitter()
{
return new ServiceEmitter(null, null, null)
{
@Override
public void emit(Event event)
{
}
@Override
public void emit(ServiceEventBuilder builder)
{
}
};
}
}

View File

@ -27,6 +27,7 @@ import com.google.common.collect.Maps;
import com.google.common.collect.Ordering; import com.google.common.collect.Ordering;
import com.google.common.hash.Hashing; import com.google.common.hash.Hashing;
import com.metamx.common.Pair; import com.metamx.common.Pair;
import com.metamx.common.guava.FunctionalIterable;
import com.metamx.common.logger.Logger; import com.metamx.common.logger.Logger;
import io.druid.data.input.MapBasedInputRow; import io.druid.data.input.MapBasedInputRow;
import io.druid.granularity.QueryGranularity; import io.druid.granularity.QueryGranularity;
@ -43,6 +44,7 @@ import io.druid.segment.serde.ComplexMetrics;
import io.druid.timeline.TimelineObjectHolder; import io.druid.timeline.TimelineObjectHolder;
import io.druid.timeline.VersionedIntervalTimeline; import io.druid.timeline.VersionedIntervalTimeline;
import io.druid.timeline.partition.NoneShardSpec; import io.druid.timeline.partition.NoneShardSpec;
import io.druid.timeline.partition.PartitionChunk;
import io.druid.timeline.partition.ShardSpec; import io.druid.timeline.partition.ShardSpec;
import org.joda.time.DateTime; import org.joda.time.DateTime;
import org.joda.time.Interval; import org.joda.time.Interval;
@ -426,8 +428,6 @@ public class SchemalessIndex
List<File> filesToMap = makeFilesToMap(tmpFile, files); List<File> filesToMap = makeFilesToMap(tmpFile, files);
List<IndexableAdapter> adapters = Lists.newArrayList();
VersionedIntervalTimeline<Integer, File> timeline = new VersionedIntervalTimeline<Integer, File>( VersionedIntervalTimeline<Integer, File> timeline = new VersionedIntervalTimeline<Integer, File>(
Ordering.natural().nullsFirst() Ordering.natural().nullsFirst()
); );
@ -438,33 +438,49 @@ public class SchemalessIndex
timeline.add(intervals.get(i), i, noneShardSpec.createChunk(filesToMap.get(i))); timeline.add(intervals.get(i), i, noneShardSpec.createChunk(filesToMap.get(i)));
} }
List<Pair<File, Interval>> intervalsToMerge = Lists.transform( final List<IndexableAdapter> adapters = Lists.newArrayList(
timeline.lookup(new Interval("1000-01-01/3000-01-01")), Iterables.concat(
new Function<TimelineObjectHolder<Integer, File>, Pair<File, Interval>>() // TimelineObjectHolder is actually an iterable of iterable of indexable adapters
{ Iterables.transform(
@Override timeline.lookup(new Interval("1000-01-01/3000-01-01")),
public Pair<File, Interval> apply(@Nullable TimelineObjectHolder<Integer, File> input) new Function<TimelineObjectHolder<Integer, File>, Iterable<IndexableAdapter>>()
{
return new Pair<File, Interval>(input.getObject().getChunk(0).getObject(), input.getInterval());
}
}
);
for (final Pair<File, Interval> pair : intervalsToMerge) {
adapters.add(
new RowboatFilteringIndexAdapter(
new QueryableIndexIndexableAdapter(IndexIO.loadIndex(pair.lhs)),
new Predicate<Rowboat>()
{
@Override
public boolean apply(@Nullable Rowboat input)
{ {
return pair.rhs.contains(input.getTimestamp()); @Override
public Iterable<IndexableAdapter> apply(final TimelineObjectHolder<Integer, File> timelineObjectHolder)
{
return Iterables.transform(
timelineObjectHolder.getObject(),
// Each chunk can be used to build the actual IndexableAdapter
new Function<PartitionChunk<File>, IndexableAdapter>()
{
@Override
public IndexableAdapter apply(PartitionChunk<File> chunk)
{
try {
return new RowboatFilteringIndexAdapter(
new QueryableIndexIndexableAdapter(IndexIO.loadIndex(chunk.getObject())),
new Predicate<Rowboat>()
{
@Override
public boolean apply(Rowboat input)
{
return timelineObjectHolder.getInterval().contains(input.getTimestamp());
}
}
);
}
catch (IOException e) {
throw Throwables.propagate(e);
}
}
}
);
}
} }
} )
) )
); );
}
return IndexIO.loadIndex(IndexMerger.append(adapters, mergedFile)); return IndexIO.loadIndex(IndexMerger.append(adapters, mergedFile));
} }

View File

@ -21,6 +21,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function; import com.google.common.base.Function;
import com.google.common.base.Throwables; import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Ordering; import com.google.common.collect.Ordering;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
@ -115,19 +116,21 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
} }
); );
final List<DataSegment> segments = Lists.transform( return Lists.newArrayList(
timeline.lookup(interval), Iterables.concat(
new Function<TimelineObjectHolder<String, DataSegment>, DataSegment>() Iterables.transform(
{ timeline.lookup(interval),
@Override new Function<TimelineObjectHolder<String, DataSegment>, Iterable<DataSegment>>()
public DataSegment apply(TimelineObjectHolder<String, DataSegment> input) {
{ @Override
return input.getObject().getChunk(0).getObject(); public Iterable<DataSegment> apply(TimelineObjectHolder<String, DataSegment> input)
} {
} return input.getObject().payloads();
}
}
)
)
); );
return segments;
} }
/** /**

View File

@ -267,6 +267,7 @@ public class RealtimePlumber implements Plumber
throw new ISE("No timeline entry at all!"); throw new ISE("No timeline entry at all!");
} }
// The realtime plumber always uses SingleElementPartitionChunk
final Sink theSink = holder.getObject().getChunk(0).getObject(); final Sink theSink = holder.getObject().getChunk(0).getObject();
if (theSink == null) { if (theSink == null) {