From 942c17a2aad1f80381dca7faf7ee05d3fadecae0 Mon Sep 17 00:00:00 2001 From: msprunck Date: Tue, 10 Mar 2015 22:04:47 +0100 Subject: [PATCH] Remove timeline chunk count assumptions. * Replace with generic iterables --- .../indexing/common/task/AppendTask.java | 49 +- .../IngestSegmentFirehoseFactory.java | 315 +++++++----- .../IngestSegmentFirehoseFactoryTest.java | 460 ++++++++++++++++++ .../io/druid/segment/SchemalessIndex.java | 70 ++- .../IndexerSQLMetadataStorageCoordinator.java | 27 +- .../realtime/plumber/RealtimePlumber.java | 1 + 6 files changed, 746 insertions(+), 176 deletions(-) create mode 100644 indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/AppendTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/AppendTask.java index b8705ebcb42..ddf2a8c5929 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/AppendTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/AppendTask.java @@ -22,6 +22,7 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Function; import com.google.common.base.Preconditions; import com.google.common.base.Predicate; +import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Ordering; import io.druid.segment.IndexIO; @@ -33,8 +34,10 @@ import io.druid.segment.RowboatFilteringIndexAdapter; import io.druid.timeline.DataSegment; import io.druid.timeline.TimelineObjectHolder; import io.druid.timeline.VersionedIntervalTimeline; +import io.druid.timeline.partition.PartitionChunk; import org.joda.time.Interval; +import javax.annotation.Nullable; import java.io.File; import java.util.List; import java.util.Map; @@ -65,22 +68,36 @@ public class AppendTask extends MergeTaskBase timeline.add(segment.getInterval(), segment.getVersion(), segment.getShardSpec().createChunk(segment)); } - final List segmentsToMerge = Lists.transform( - timeline.lookup(new Interval("1000-01-01/3000-01-01")), - new Function, SegmentToMergeHolder>() - { - @Override - public SegmentToMergeHolder apply(TimelineObjectHolder input) - { - final DataSegment segment = input.getObject().getChunk(0).getObject(); - final File file = Preconditions.checkNotNull( - segments.get(segment), - "File for segment %s", segment.getIdentifier() - ); - - return new SegmentToMergeHolder(segment, input.getInterval(), file); - } - } + final Iterable segmentsToMerge = Iterables.concat( + Iterables.transform( + timeline.lookup(new Interval("1000-01-01/3000-01-01")), + new Function, Iterable>() + { + @Override + public Iterable apply(final TimelineObjectHolder input) + { + return Iterables.transform( + input.getObject(), + new Function, SegmentToMergeHolder>() + { + @Nullable + @Override + public SegmentToMergeHolder apply(PartitionChunk chunkInput) + { + DataSegment segment = chunkInput.getObject(); + return new SegmentToMergeHolder( + segment, input.getInterval(), + Preconditions.checkNotNull( + segments.get(segment), + "File for segment %s", segment.getIdentifier() + ) + ); + } + } + ); + } + } + ) ); List adapters = Lists.newArrayList(); diff --git a/indexing-service/src/main/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactory.java b/indexing-service/src/main/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactory.java index a400a00785d..98f48800f84 100644 --- a/indexing-service/src/main/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactory.java +++ b/indexing-service/src/main/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactory.java @@ -29,6 +29,7 @@ import com.google.common.collect.Maps; import com.google.common.collect.Ordering; import com.google.common.collect.Sets; import com.google.inject.Injector; +import com.google.inject.Provider; import com.metamx.common.guava.Sequence; import com.metamx.common.guava.Sequences; import com.metamx.common.guava.Yielder; @@ -61,6 +62,7 @@ import io.druid.segment.loading.SegmentLoadingException; import io.druid.timeline.DataSegment; import io.druid.timeline.TimelineObjectHolder; import io.druid.timeline.VersionedIntervalTimeline; +import io.druid.timeline.partition.PartitionChunk; import io.druid.utils.Runnables; import org.joda.time.DateTime; import org.joda.time.Interval; @@ -68,7 +70,6 @@ import org.joda.time.Interval; import javax.annotation.Nullable; import java.io.File; import java.io.IOException; -import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -166,15 +167,43 @@ public class IngestSegmentFirehoseFactory implements FirehoseFactory dimSet = new HashSet<>(); - for (TimelineObjectHolder timelineObjectHolder : timeLineSegments) { - dimSet.addAll(timelineObjectHolder.getObject().getChunk(0).getObject().getDimensions()); - } + Set dimSet = Sets.newHashSet( + Iterables.concat( + Iterables.transform( + timeLineSegments, + new Function, Iterable>() + { + @Override + public Iterable apply( + TimelineObjectHolder timelineObjectHolder + ) + { + return Iterables.concat( + Iterables.transform( + timelineObjectHolder.getObject(), + new Function, Iterable>() + { + @Override + public Iterable apply(PartitionChunk input) + { + return input.getObject().getDimensions(); + } + } + ) + ); + } + } + + ) + ) + ); dims = Lists.newArrayList( Sets.difference( dimSet, - inputRowParser.getParseSpec().getDimensionsSpec() - .getDimensionExclusions() + inputRowParser + .getParseSpec() + .getDimensionsSpec() + .getDimensionExclusions() ) ); } @@ -183,35 +212,79 @@ public class IngestSegmentFirehoseFactory implements FirehoseFactory metricsSet = new HashSet<>(); - for (TimelineObjectHolder timelineObjectHolder : timeLineSegments) { - metricsSet.addAll(timelineObjectHolder.getObject().getChunk(0).getObject().getMetrics()); - } + Set metricsSet = Sets.newHashSet( + Iterables.concat( + Iterables.transform( + timeLineSegments, + new Function, Iterable>() + { + @Override + public Iterable apply( + TimelineObjectHolder input + ) + { + return Iterables.concat( + Iterables.transform( + input.getObject(), + new Function, Iterable>() + { + @Override + public Iterable apply(PartitionChunk input) + { + return input.getObject().getMetrics(); + } + } + ) + ); + } + } + ) + ) + ); metricsList = Lists.newArrayList(metricsSet); } - final List adapters = Lists.transform( - timeLineSegments, - new Function, StorageAdapter>() - { - @Override - public StorageAdapter apply(TimelineObjectHolder input) - { - final DataSegment segment = input.getObject().getChunk(0).getObject(); - final File file = Preconditions.checkNotNull( - segmentFileMap.get(segment), - "File for segment %s", segment.getIdentifier() - ); - - try { - return new QueryableIndexStorageAdapter((IndexIO.loadIndex(file))); - } - catch (IOException e) { - throw Throwables.propagate(e); - } - } - } + final List adapters = Lists.newArrayList( + Iterables.concat( + Iterables.transform( + timeLineSegments, + new Function, Iterable>() + { + @Override + public Iterable apply( + TimelineObjectHolder input + ) + { + return + Iterables.transform( + input.getObject(), + new Function, StorageAdapter>() + { + @Override + public StorageAdapter apply(PartitionChunk input) + { + final DataSegment segment = input.getObject(); + try { + return new QueryableIndexStorageAdapter( + IndexIO.loadIndex( + Preconditions.checkNotNull( + segmentFileMap.get(segment), + "File for segment %s", segment.getIdentifier() + ) + ) + ); + } + catch (IOException e) { + throw Throwables.propagate(e); + } + } + } + ); + } + } + ) + ) ); return new IngestSegmentFirehose(adapters, dims, metricsList); @@ -235,112 +308,112 @@ public class IngestSegmentFirehoseFactory implements FirehoseFactory rows = Sequences.concat( Iterables.transform( adapters, new Function>() - { - @Nullable - @Override - public Sequence apply(StorageAdapter adapter) - { - return Sequences.concat( - Sequences.map( - adapter.makeCursors( - Filters.convertDimensionFilters(dimFilter), - interval, - QueryGranularity.ALL - ), new Function>() - { - @Nullable - @Override - public Sequence apply(final Cursor cursor) - { - final LongColumnSelector timestampColumnSelector = cursor.makeLongColumnSelector(Column.TIME_COLUMN_NAME); - - final Map dimSelectors = Maps.newHashMap(); - for (String dim : dims) { - final DimensionSelector dimSelector = cursor.makeDimensionSelector(dim, null); - // dimSelector is null if the dimension is not present - if (dimSelector != null) { - dimSelectors.put(dim, dimSelector); - } - } - - final Map metSelectors = Maps.newHashMap(); - for (String metric : metrics) { - final ObjectColumnSelector metricSelector = cursor.makeObjectColumnSelector(metric); - if (metricSelector != null) { - metSelectors.put(metric, metricSelector); - } - } - - return Sequences.simple( - new Iterable() + { + @Nullable + @Override + public Sequence apply(StorageAdapter adapter) + { + return Sequences.concat( + Sequences.map( + adapter.makeCursors( + Filters.convertDimensionFilters(dimFilter), + interval, + QueryGranularity.ALL + ), new Function>() { + @Nullable @Override - public Iterator iterator() + public Sequence apply(final Cursor cursor) { - return new Iterator() - { - @Override - public boolean hasNext() - { - return !cursor.isDone(); + final LongColumnSelector timestampColumnSelector = cursor.makeLongColumnSelector(Column.TIME_COLUMN_NAME); + + final Map dimSelectors = Maps.newHashMap(); + for (String dim : dims) { + final DimensionSelector dimSelector = cursor.makeDimensionSelector(dim, null); + // dimSelector is null if the dimension is not present + if (dimSelector != null) { + dimSelectors.put(dim, dimSelector); } + } - @Override - public InputRow next() - { - final Map theEvent = Maps.newLinkedHashMap(); - final long timestamp = timestampColumnSelector.get(); - theEvent.put(EventHolder.timestampKey, new DateTime(timestamp)); + final Map metSelectors = Maps.newHashMap(); + for (String metric : metrics) { + final ObjectColumnSelector metricSelector = cursor.makeObjectColumnSelector(metric); + if (metricSelector != null) { + metSelectors.put(metric, metricSelector); + } + } - for (Map.Entry dimSelector : dimSelectors.entrySet()) { - final String dim = dimSelector.getKey(); - final DimensionSelector selector = dimSelector.getValue(); - final IndexedInts vals = selector.getRow(); + return Sequences.simple( + new Iterable() + { + @Override + public Iterator iterator() + { + return new Iterator() + { + @Override + public boolean hasNext() + { + return !cursor.isDone(); + } - if (vals.size() == 1) { - final String dimVal = selector.lookupName(vals.get(0)); - theEvent.put(dim, dimVal); - } else { - List dimVals = Lists.newArrayList(); - for (int i = 0; i < vals.size(); ++i) { - dimVals.add(selector.lookupName(vals.get(i))); - } - theEvent.put(dim, dimVals); + @Override + public InputRow next() + { + final Map theEvent = Maps.newLinkedHashMap(); + final long timestamp = timestampColumnSelector.get(); + theEvent.put(EventHolder.timestampKey, new DateTime(timestamp)); + + for (Map.Entry dimSelector : dimSelectors.entrySet()) { + final String dim = dimSelector.getKey(); + final DimensionSelector selector = dimSelector.getValue(); + final IndexedInts vals = selector.getRow(); + + if (vals.size() == 1) { + final String dimVal = selector.lookupName(vals.get(0)); + theEvent.put(dim, dimVal); + } else { + List dimVals = Lists.newArrayList(); + for (int i = 0; i < vals.size(); ++i) { + dimVals.add(selector.lookupName(vals.get(i))); + } + theEvent.put(dim, dimVals); + } + } + + for (Map.Entry metSelector : metSelectors.entrySet()) { + final String metric = metSelector.getKey(); + final ObjectColumnSelector selector = metSelector.getValue(); + theEvent.put(metric, selector.get()); + } + cursor.advance(); + return new MapBasedInputRow(timestamp, dims, theEvent); + } + + @Override + public void remove() + { + throw new UnsupportedOperationException("Remove Not Supported"); + } + }; } } - - for (Map.Entry metSelector : metSelectors.entrySet()) { - final String metric = metSelector.getKey(); - final ObjectColumnSelector selector = metSelector.getValue(); - theEvent.put(metric, selector.get()); - } - cursor.advance(); - return new MapBasedInputRow(timestamp, dims, theEvent); - } - - @Override - public void remove() - { - throw new UnsupportedOperationException("Remove Not Supported"); - } - }; + ); } } - ); - } - } - ) - ); - } - } + ) + ); + } + } ) ); rowYielder = rows.toYielder( null, - new YieldingAccumulator() + new YieldingAccumulator() { @Override - public Object accumulate(Object accumulated, Object in) + public InputRow accumulate(InputRow accumulated, InputRow in) { yield(); return in; diff --git a/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java b/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java new file mode 100644 index 00000000000..1650929b156 --- /dev/null +++ b/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java @@ -0,0 +1,460 @@ +/* + * Druid - a distributed column store. + * Copyright 2012 - 2015 Metamarkets Group Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.druid.indexing.firehose; + +import com.google.api.client.repackaged.com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import com.google.common.io.Files; +import com.google.inject.Binder; +import com.google.inject.Guice; +import com.google.inject.Module; +import com.metamx.common.logger.Logger; +import com.metamx.emitter.core.Event; +import com.metamx.emitter.service.ServiceEmitter; +import com.metamx.emitter.service.ServiceEventBuilder; +import io.druid.common.utils.JodaUtils; +import io.druid.data.input.InputRow; +import io.druid.data.input.impl.DimensionsSpec; +import io.druid.data.input.impl.InputRowParser; +import io.druid.data.input.impl.JSONParseSpec; +import io.druid.data.input.impl.MapInputRowParser; +import io.druid.data.input.impl.SpatialDimensionSchema; +import io.druid.data.input.impl.TimestampSpec; +import io.druid.granularity.QueryGranularity; +import io.druid.indexing.common.SegmentLoaderFactory; +import io.druid.indexing.common.TaskToolboxFactory; +import io.druid.indexing.common.actions.LocalTaskActionClientFactory; +import io.druid.indexing.common.actions.TaskActionToolbox; +import io.druid.indexing.common.config.TaskConfig; +import io.druid.indexing.common.config.TaskStorageConfig; +import io.druid.indexing.overlord.HeapMemoryTaskStorage; +import io.druid.indexing.overlord.TaskLockbox; +import io.druid.jackson.DefaultObjectMapper; +import io.druid.metadata.IndexerSQLMetadataStorageCoordinator; +import io.druid.query.aggregation.AggregatorFactory; +import io.druid.query.aggregation.DoubleSumAggregatorFactory; +import io.druid.query.aggregation.LongSumAggregatorFactory; +import io.druid.query.filter.SelectorDimFilter; +import io.druid.segment.IndexMerger; +import io.druid.segment.incremental.IncrementalIndexSchema; +import io.druid.segment.incremental.OnheapIncrementalIndex; +import io.druid.segment.loading.DataSegmentArchiver; +import io.druid.segment.loading.DataSegmentKiller; +import io.druid.segment.loading.DataSegmentMover; +import io.druid.segment.loading.DataSegmentPuller; +import io.druid.segment.loading.DataSegmentPusher; +import io.druid.segment.loading.LocalDataSegmentPuller; +import io.druid.segment.loading.OmniSegmentLoader; +import io.druid.segment.loading.SegmentLoaderConfig; +import io.druid.segment.loading.SegmentLoadingException; +import io.druid.segment.loading.StorageLocationConfig; +import io.druid.timeline.DataSegment; +import io.druid.timeline.partition.NumberedShardSpec; +import org.joda.time.Interval; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; + +/** + * + */ +@RunWith(Parameterized.class) +public class IngestSegmentFirehoseFactoryTest +{ + @Parameterized.Parameters(name = "{1}") + public static Collection constructorFeeder() throws IOException + { + + final HeapMemoryTaskStorage ts = new HeapMemoryTaskStorage( + new TaskStorageConfig(null) + { + } + ); + final IncrementalIndexSchema schema = new IncrementalIndexSchema.Builder() + .withQueryGranularity(QueryGranularity.NONE) + .withMinTimestamp(JodaUtils.MIN_INSTANT) + .withDimensionsSpec(ROW_PARSER) + .withMetrics( + new AggregatorFactory[]{ + new LongSumAggregatorFactory(METRIC_LONG_NAME, DIM_LONG_NAME), + new DoubleSumAggregatorFactory(METRIC_FLOAT_NAME, DIM_FLOAT_NAME) + } + ) + .build(); + final OnheapIncrementalIndex index = new OnheapIncrementalIndex( + schema, + MAX_ROWS * MAX_SHARD_NUMBER + ); + + for (Integer i = 0; i < MAX_ROWS; ++i) { + index.add(ROW_PARSER.parse(buildRow(i.longValue()))); + } + + if (!persistDir.mkdirs() && !persistDir.exists()) { + throw new IOException(String.format("Could not create directory at [%s]", persistDir.getAbsolutePath())); + } + IndexMerger.persist(index, persistDir); + + final TaskLockbox tl = new TaskLockbox(ts); + final IndexerSQLMetadataStorageCoordinator mdc = new IndexerSQLMetadataStorageCoordinator(null, null, null) + { + final private Set published = Sets.newHashSet(); + final private Set nuked = Sets.newHashSet(); + + @Override + public List getUsedSegmentsForInterval(String dataSource, Interval interval) throws IOException + { + return ImmutableList.copyOf(segmentSet); + } + + @Override + public List getUnusedSegmentsForInterval(String dataSource, Interval interval) + { + return ImmutableList.of(); + } + + @Override + public Set announceHistoricalSegments(Set segments) + { + Set added = Sets.newHashSet(); + for (final DataSegment segment : segments) { + if (published.add(segment)) { + added.add(segment); + } + } + + return ImmutableSet.copyOf(added); + } + + @Override + public void deleteSegments(Set segments) + { + nuked.addAll(segments); + } + }; + final LocalTaskActionClientFactory tac = new LocalTaskActionClientFactory( + ts, + new TaskActionToolbox(tl, mdc, newMockEmitter()) + ); + final TaskToolboxFactory taskToolboxFactory = new TaskToolboxFactory( + new TaskConfig(tmpDir.getAbsolutePath(), null, null, 50000, null), + tac, + newMockEmitter(), + new DataSegmentPusher() + { + @Override + public String getPathForHadoop(String dataSource) + { + throw new UnsupportedOperationException(); + } + + @Override + public DataSegment push(File file, DataSegment segment) throws IOException + { + return segment; + } + }, + new DataSegmentKiller() + { + @Override + public void kill(DataSegment segments) throws SegmentLoadingException + { + + } + }, + new DataSegmentMover() + { + @Override + public DataSegment move(DataSegment dataSegment, Map targetLoadSpec) + throws SegmentLoadingException + { + return dataSegment; + } + }, + new DataSegmentArchiver() + { + @Override + public DataSegment archive(DataSegment segment) throws SegmentLoadingException + { + return segment; + } + + @Override + public DataSegment restore(DataSegment segment) throws SegmentLoadingException + { + return segment; + } + }, + null, // segment announcer + null, // new segment server view + null, // query runner factory conglomerate corporation unionized collective + null, // query executor service + null, // monitor scheduler + new SegmentLoaderFactory( + new OmniSegmentLoader( + ImmutableMap.of( + "local", + new LocalDataSegmentPuller() + ), + null, + new SegmentLoaderConfig() + { + @Override + public List getLocations() + { + return Lists.newArrayList(); + } + } + ) + ), + new DefaultObjectMapper() + ); + Collection values = new LinkedList<>(); + for (InputRowParser parser : Arrays.asList( + ROW_PARSER, + new MapInputRowParser( + new JSONParseSpec( + new TimestampSpec(TIME_COLUMN, "auto"), + new DimensionsSpec( + ImmutableList.of(), + ImmutableList.of(DIM_FLOAT_NAME, DIM_LONG_NAME), + ImmutableList.of() + ) + ) + ) + )) { + for (List dim_names : Arrays.>asList(null, ImmutableList.of(DIM_NAME))) { + for (List metric_names : Arrays.>asList( + null, + ImmutableList.of(METRIC_LONG_NAME, METRIC_FLOAT_NAME) + )) { + values.add( + new Object[]{ + new IngestSegmentFirehoseFactory( + DATA_SOURCE_NAME, + FOREVER, + new SelectorDimFilter(DIM_NAME, DIM_VALUE), + dim_names, + metric_names, + Guice.createInjector( + new Module() + { + @Override + public void configure(Binder binder) + { + binder.bind(TaskToolboxFactory.class).toInstance(taskToolboxFactory); + } + } + ) + ), + String.format( + "DimNames[%s]MetricNames[%s]ParserDimNames[%s]", + dim_names == null ? "null" : "dims", + metric_names == null ? "null" : "metrics", + parser == ROW_PARSER ? "dims" : "null" + ), + parser + } + ); + } + } + } + return values; + } + + public IngestSegmentFirehoseFactoryTest( + IngestSegmentFirehoseFactory factory, + String testName, + InputRowParser rowParser + ) + { + this.factory = factory; + this.rowParser = rowParser; + } + + private static final Logger log = new Logger(IngestSegmentFirehoseFactoryTest.class); + private static final Interval FOREVER = new Interval(JodaUtils.MIN_INSTANT, JodaUtils.MAX_INSTANT); + private static final String DATA_SOURCE_NAME = "testDataSource"; + private static final String DATA_SOURCE_VERSION = "version"; + private static final Integer BINARY_VERSION = -1; + private static final String DIM_NAME = "testDimName"; + private static final String DIM_VALUE = "testDimValue"; + private static final String DIM_LONG_NAME = "testDimLongName"; + private static final String DIM_FLOAT_NAME = "testDimFloatName"; + private static final String METRIC_LONG_NAME = "testLongMetric"; + private static final String METRIC_FLOAT_NAME = "testFloatMetric"; + private static final Long METRIC_LONG_VALUE = 1l; + private static final Float METRIC_FLOAT_VALUE = 1.0f; + private static final String TIME_COLUMN = "ts"; + private static final Integer MAX_SHARD_NUMBER = 10; + private static final Integer MAX_ROWS = 10; + private static final File tmpDir = Files.createTempDir(); + private static final File persistDir = Paths.get(tmpDir.getAbsolutePath(), "indexTestMerger").toFile(); + private static final List segmentSet = new ArrayList<>(MAX_SHARD_NUMBER); + + private final IngestSegmentFirehoseFactory factory; + private final InputRowParser rowParser; + + private static final InputRowParser> ROW_PARSER = new MapInputRowParser( + new JSONParseSpec( + new TimestampSpec(TIME_COLUMN, "auto"), + new DimensionsSpec( + ImmutableList.of(DIM_NAME), + ImmutableList.of(DIM_FLOAT_NAME, DIM_LONG_NAME), + ImmutableList.of() + ) + ) + ); + + private static Map buildRow(Long ts) + { + return ImmutableMap.of( + TIME_COLUMN, ts, + DIM_NAME, DIM_VALUE, + DIM_FLOAT_NAME, METRIC_FLOAT_VALUE, + DIM_LONG_NAME, METRIC_LONG_VALUE + ); + } + + private static DataSegment buildSegment(Integer shardNumber) + { + Preconditions.checkArgument(shardNumber < MAX_SHARD_NUMBER); + Preconditions.checkArgument(shardNumber >= 0); + return new DataSegment( + DATA_SOURCE_NAME, + FOREVER, + DATA_SOURCE_VERSION, + ImmutableMap.of( + "type", "local", + "path", persistDir.getAbsolutePath() + ), + ImmutableList.of(DIM_NAME), + ImmutableList.of(METRIC_LONG_NAME, METRIC_FLOAT_NAME), + new NumberedShardSpec( + shardNumber, + MAX_SHARD_NUMBER + ), + BINARY_VERSION, + 0l + ); + } + + @BeforeClass + public static void setUpStatic() throws IOException + { + for (int i = 0; i < MAX_SHARD_NUMBER; ++i) { + segmentSet.add(buildSegment(i)); + } + } + + @AfterClass + public static void tearDownStatic() + { + recursivelyDelete(tmpDir); + } + + private static void recursivelyDelete(final File dir) + { + if (dir != null) { + if (dir.isDirectory()) { + final File[] files = dir.listFiles(); + if (files != null) { + for (File file : files) { + recursivelyDelete(file); + } + } + } else { + if (!dir.delete()) { + log.warn("Could not delete file at [%s]", dir.getAbsolutePath()); + } + } + } + } + + @Test + public void sanityTest() + { + Assert.assertEquals(DATA_SOURCE_NAME, factory.getDataSource()); + if (factory.getDimensions() != null) { + Assert.assertArrayEquals(new String[]{DIM_NAME}, factory.getDimensions().toArray()); + } + Assert.assertEquals(FOREVER, factory.getInterval()); + if (factory.getMetrics() != null) { + Assert.assertEquals( + ImmutableSet.of(METRIC_LONG_NAME, METRIC_FLOAT_NAME), + ImmutableSet.copyOf(factory.getMetrics()) + ); + } + } + + @Test + public void simpleFirehoseReadingTest() throws IOException + { + Assert.assertEquals(MAX_SHARD_NUMBER.longValue(), segmentSet.size()); + Integer rowcount = 0; + try (final IngestSegmentFirehoseFactory.IngestSegmentFirehose firehose = + (IngestSegmentFirehoseFactory.IngestSegmentFirehose) + factory.connect(rowParser)) { + while (firehose.hasMore()) { + InputRow row = firehose.nextRow(); + Assert.assertArrayEquals(new String[]{DIM_NAME}, row.getDimensions().toArray()); + Assert.assertArrayEquals(new String[]{DIM_VALUE}, row.getDimension(DIM_NAME).toArray()); + Assert.assertEquals(METRIC_LONG_VALUE.longValue(), row.getLongMetric(METRIC_LONG_NAME)); + Assert.assertEquals(METRIC_FLOAT_VALUE, row.getFloatMetric(METRIC_FLOAT_NAME), METRIC_FLOAT_VALUE * 0.0001); + ++rowcount; + } + } + Assert.assertEquals((int) MAX_SHARD_NUMBER * MAX_ROWS, (int) rowcount); + } + + private static ServiceEmitter newMockEmitter() + { + return new ServiceEmitter(null, null, null) + { + @Override + public void emit(Event event) + { + + } + + @Override + public void emit(ServiceEventBuilder builder) + { + + } + }; + } +} diff --git a/processing/src/test/java/io/druid/segment/SchemalessIndex.java b/processing/src/test/java/io/druid/segment/SchemalessIndex.java index 67e40ed842d..7e7fb3d31e8 100644 --- a/processing/src/test/java/io/druid/segment/SchemalessIndex.java +++ b/processing/src/test/java/io/druid/segment/SchemalessIndex.java @@ -27,6 +27,7 @@ import com.google.common.collect.Maps; import com.google.common.collect.Ordering; import com.google.common.hash.Hashing; import com.metamx.common.Pair; +import com.metamx.common.guava.FunctionalIterable; import com.metamx.common.logger.Logger; import io.druid.data.input.MapBasedInputRow; import io.druid.granularity.QueryGranularity; @@ -43,6 +44,7 @@ import io.druid.segment.serde.ComplexMetrics; import io.druid.timeline.TimelineObjectHolder; import io.druid.timeline.VersionedIntervalTimeline; import io.druid.timeline.partition.NoneShardSpec; +import io.druid.timeline.partition.PartitionChunk; import io.druid.timeline.partition.ShardSpec; import org.joda.time.DateTime; import org.joda.time.Interval; @@ -426,8 +428,6 @@ public class SchemalessIndex List filesToMap = makeFilesToMap(tmpFile, files); - List adapters = Lists.newArrayList(); - VersionedIntervalTimeline timeline = new VersionedIntervalTimeline( Ordering.natural().nullsFirst() ); @@ -438,33 +438,49 @@ public class SchemalessIndex timeline.add(intervals.get(i), i, noneShardSpec.createChunk(filesToMap.get(i))); } - List> intervalsToMerge = Lists.transform( - timeline.lookup(new Interval("1000-01-01/3000-01-01")), - new Function, Pair>() - { - @Override - public Pair apply(@Nullable TimelineObjectHolder input) - { - return new Pair(input.getObject().getChunk(0).getObject(), input.getInterval()); - } - } - ); - - for (final Pair pair : intervalsToMerge) { - adapters.add( - new RowboatFilteringIndexAdapter( - new QueryableIndexIndexableAdapter(IndexIO.loadIndex(pair.lhs)), - new Predicate() - { - @Override - public boolean apply(@Nullable Rowboat input) + final List adapters = Lists.newArrayList( + Iterables.concat( + // TimelineObjectHolder is actually an iterable of iterable of indexable adapters + Iterables.transform( + timeline.lookup(new Interval("1000-01-01/3000-01-01")), + new Function, Iterable>() { - return pair.rhs.contains(input.getTimestamp()); + @Override + public Iterable apply(final TimelineObjectHolder timelineObjectHolder) + { + return Iterables.transform( + timelineObjectHolder.getObject(), + + // Each chunk can be used to build the actual IndexableAdapter + new Function, IndexableAdapter>() + { + @Override + public IndexableAdapter apply(PartitionChunk chunk) + { + try { + return new RowboatFilteringIndexAdapter( + new QueryableIndexIndexableAdapter(IndexIO.loadIndex(chunk.getObject())), + new Predicate() + { + @Override + public boolean apply(Rowboat input) + { + return timelineObjectHolder.getInterval().contains(input.getTimestamp()); + } + } + ); + } + catch (IOException e) { + throw Throwables.propagate(e); + } + } + } + ); + } } - } - ) - ); - } + ) + ) + ); return IndexIO.loadIndex(IndexMerger.append(adapters, mergedFile)); } diff --git a/server/src/main/java/io/druid/metadata/IndexerSQLMetadataStorageCoordinator.java b/server/src/main/java/io/druid/metadata/IndexerSQLMetadataStorageCoordinator.java index 545ef25d996..df208a05093 100644 --- a/server/src/main/java/io/druid/metadata/IndexerSQLMetadataStorageCoordinator.java +++ b/server/src/main/java/io/druid/metadata/IndexerSQLMetadataStorageCoordinator.java @@ -21,6 +21,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Function; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Ordering; import com.google.common.collect.Sets; @@ -115,19 +116,21 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor } ); - final List segments = Lists.transform( - timeline.lookup(interval), - new Function, DataSegment>() - { - @Override - public DataSegment apply(TimelineObjectHolder input) - { - return input.getObject().getChunk(0).getObject(); - } - } + return Lists.newArrayList( + Iterables.concat( + Iterables.transform( + timeline.lookup(interval), + new Function, Iterable>() + { + @Override + public Iterable apply(TimelineObjectHolder input) + { + return input.getObject().payloads(); + } + } + ) + ) ); - - return segments; } /** diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java index 3701948ec0f..02c6786853e 100644 --- a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java +++ b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java @@ -267,6 +267,7 @@ public class RealtimePlumber implements Plumber throw new ISE("No timeline entry at all!"); } + // The realtime plumber always uses SingleElementPartitionChunk final Sink theSink = holder.getObject().getChunk(0).getObject(); if (theSink == null) {