diff --git a/indexing-service/src/main/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactory.java b/indexing-service/src/main/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactory.java index 9d510c14300..d620d90dc85 100644 --- a/indexing-service/src/main/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactory.java +++ b/indexing-service/src/main/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactory.java @@ -25,20 +25,13 @@ import com.google.common.base.Preconditions; import com.google.common.base.Throwables; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; -import com.google.common.collect.Maps; import com.google.common.collect.Ordering; import com.google.common.collect.Sets; import com.google.inject.Injector; -import com.metamx.common.guava.Sequence; -import com.metamx.common.guava.Sequences; -import com.metamx.common.guava.Yielder; -import com.metamx.common.guava.YieldingAccumulator; import com.metamx.common.parsers.ParseException; import com.metamx.emitter.EmittingLogger; import io.druid.data.input.Firehose; import io.druid.data.input.FirehoseFactory; -import io.druid.data.input.InputRow; -import io.druid.data.input.MapBasedInputRow; import io.druid.data.input.impl.InputRowParser; import io.druid.granularity.QueryGranularity; import io.druid.indexing.common.TaskToolbox; @@ -46,30 +39,19 @@ import io.druid.indexing.common.TaskToolboxFactory; import io.druid.indexing.common.actions.SegmentListUsedAction; import io.druid.indexing.common.task.NoopTask; import io.druid.query.filter.DimFilter; -import io.druid.query.select.EventHolder; -import io.druid.segment.Cursor; -import io.druid.segment.DimensionSelector; import io.druid.segment.IndexIO; -import io.druid.segment.LongColumnSelector; -import io.druid.segment.ObjectColumnSelector; import io.druid.segment.QueryableIndexStorageAdapter; import io.druid.segment.StorageAdapter; -import io.druid.segment.column.Column; -import io.druid.segment.data.IndexedInts; -import io.druid.segment.filter.Filters; import io.druid.segment.loading.SegmentLoadingException; +import io.druid.segment.realtime.firehose.IngestSegmentFirehose; import io.druid.timeline.DataSegment; import io.druid.timeline.TimelineObjectHolder; import io.druid.timeline.VersionedIntervalTimeline; import io.druid.timeline.partition.PartitionChunk; -import io.druid.utils.Runnables; -import org.joda.time.DateTime; import org.joda.time.Interval; -import javax.annotation.Nullable; import java.io.File; import java.io.IOException; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; @@ -286,7 +268,7 @@ public class IngestSegmentFirehoseFactory implements FirehoseFactory rowYielder; - - public IngestSegmentFirehose(List adapters, final List dims, final List metrics) - { - Sequence rows = Sequences.concat( - Iterables.transform( - adapters, new Function>() - { - @Nullable - @Override - public Sequence apply(StorageAdapter adapter) - { - return Sequences.concat( - Sequences.map( - adapter.makeCursors( - Filters.convertDimensionFilters(dimFilter), - interval, - QueryGranularity.ALL - ), new Function>() - { - @Nullable - @Override - public Sequence apply(final Cursor cursor) - { - final LongColumnSelector timestampColumnSelector = cursor.makeLongColumnSelector(Column.TIME_COLUMN_NAME); - - final Map dimSelectors = Maps.newHashMap(); - for (String dim : dims) { - final DimensionSelector dimSelector = cursor.makeDimensionSelector(dim, null); - // dimSelector is null if the dimension is not present - if (dimSelector != null) { - dimSelectors.put(dim, dimSelector); - } - } - - final Map metSelectors = Maps.newHashMap(); - for (String metric : metrics) { - final ObjectColumnSelector metricSelector = cursor.makeObjectColumnSelector(metric); - if (metricSelector != null) { - metSelectors.put(metric, metricSelector); - } - } - - return Sequences.simple( - new Iterable() - { - @Override - public Iterator iterator() - { - return new Iterator() - { - @Override - public boolean hasNext() - { - return !cursor.isDone(); - } - - @Override - public InputRow next() - { - final Map theEvent = Maps.newLinkedHashMap(); - final long timestamp = timestampColumnSelector.get(); - theEvent.put(EventHolder.timestampKey, new DateTime(timestamp)); - - for (Map.Entry dimSelector : dimSelectors.entrySet()) { - final String dim = dimSelector.getKey(); - final DimensionSelector selector = dimSelector.getValue(); - final IndexedInts vals = selector.getRow(); - - if (vals.size() == 1) { - final String dimVal = selector.lookupName(vals.get(0)); - theEvent.put(dim, dimVal); - } else { - List dimVals = Lists.newArrayList(); - for (int i = 0; i < vals.size(); ++i) { - dimVals.add(selector.lookupName(vals.get(i))); - } - theEvent.put(dim, dimVals); - } - } - - for (Map.Entry metSelector : metSelectors.entrySet()) { - final String metric = metSelector.getKey(); - final ObjectColumnSelector selector = metSelector.getValue(); - theEvent.put(metric, selector.get()); - } - cursor.advance(); - return new MapBasedInputRow(timestamp, dims, theEvent); - } - - @Override - public void remove() - { - throw new UnsupportedOperationException("Remove Not Supported"); - } - }; - } - } - ); - } - } - ) - ); - } - } - ) - ); - rowYielder = rows.toYielder( - null, - new YieldingAccumulator() - { - @Override - public InputRow accumulate(InputRow accumulated, InputRow in) - { - yield(); - return in; - } - } - ); - } - - @Override - public boolean hasMore() - { - return !rowYielder.isDone(); - } - - @Override - public InputRow nextRow() - { - final InputRow inputRow = rowYielder.get(); - rowYielder = rowYielder.next(null); - return inputRow; - } - - @Override - public Runnable commit() - { - return Runnables.getNoopRunnable(); - } - - @Override - public void close() throws IOException - { - rowYielder.close(); - } - } } diff --git a/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java b/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java index 7bb22af7fc3..c632c84ad6d 100644 --- a/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java @@ -74,6 +74,7 @@ import io.druid.segment.loading.SegmentLoaderConfig; import io.druid.segment.loading.SegmentLoaderLocalCacheManager; import io.druid.segment.loading.SegmentLoadingException; import io.druid.segment.loading.StorageLocationConfig; +import io.druid.segment.realtime.firehose.IngestSegmentFirehose; import io.druid.timeline.DataSegment; import io.druid.timeline.partition.NumberedShardSpec; import org.joda.time.Interval; @@ -461,8 +462,8 @@ public class IngestSegmentFirehoseFactoryTest { Assert.assertEquals(MAX_SHARD_NUMBER.longValue(), segmentSet.size()); Integer rowcount = 0; - try (final IngestSegmentFirehoseFactory.IngestSegmentFirehose firehose = - (IngestSegmentFirehoseFactory.IngestSegmentFirehose) + try (final IngestSegmentFirehose firehose = + (IngestSegmentFirehose) factory.connect(rowParser)) { while (firehose.hasMore()) { InputRow row = firehose.nextRow(); diff --git a/server/src/main/java/io/druid/segment/realtime/firehose/IngestSegmentFirehose.java b/server/src/main/java/io/druid/segment/realtime/firehose/IngestSegmentFirehose.java new file mode 100644 index 00000000000..bc7da0bfcf2 --- /dev/null +++ b/server/src/main/java/io/druid/segment/realtime/firehose/IngestSegmentFirehose.java @@ -0,0 +1,202 @@ +/* +* Licensed to Metamarkets Group Inc. (Metamarkets) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. Metamarkets licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package io.druid.segment.realtime.firehose; + +import com.google.common.base.Function; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.metamx.common.guava.Sequence; +import com.metamx.common.guava.Sequences; +import com.metamx.common.guava.Yielder; +import com.metamx.common.guava.YieldingAccumulator; +import io.druid.data.input.Firehose; +import io.druid.data.input.InputRow; +import io.druid.data.input.MapBasedInputRow; +import io.druid.granularity.QueryGranularity; +import io.druid.query.filter.DimFilter; +import io.druid.query.select.EventHolder; +import io.druid.segment.Cursor; +import io.druid.segment.DimensionSelector; +import io.druid.segment.LongColumnSelector; +import io.druid.segment.ObjectColumnSelector; +import io.druid.segment.StorageAdapter; +import io.druid.segment.column.Column; +import io.druid.segment.data.IndexedInts; +import io.druid.segment.filter.Filters; +import io.druid.utils.Runnables; +import org.joda.time.DateTime; +import org.joda.time.Interval; + +import javax.annotation.Nullable; +import java.io.IOException; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +public class IngestSegmentFirehose implements Firehose +{ + private volatile Yielder rowYielder; + + public IngestSegmentFirehose(List adapters, final List dims, final List metrics, final DimFilter dimFilter, final Interval interval, final QueryGranularity granularity) + { + Sequence rows = Sequences.concat( + Iterables.transform( + adapters, new Function>() + { + @Nullable + @Override + public Sequence apply(StorageAdapter adapter) + { + return Sequences.concat( + Sequences.map( + adapter.makeCursors( + Filters.convertDimensionFilters(dimFilter), + interval, + granularity + ), new Function>() + { + @Nullable + @Override + public Sequence apply(final Cursor cursor) + { + final LongColumnSelector timestampColumnSelector = cursor.makeLongColumnSelector(Column.TIME_COLUMN_NAME); + + final Map dimSelectors = Maps.newHashMap(); + for (String dim : dims) { + final DimensionSelector dimSelector = cursor.makeDimensionSelector(dim, null); + // dimSelector is null if the dimension is not present + if (dimSelector != null) { + dimSelectors.put(dim, dimSelector); + } + } + + final Map metSelectors = Maps.newHashMap(); + for (String metric : metrics) { + final ObjectColumnSelector metricSelector = cursor.makeObjectColumnSelector(metric); + if (metricSelector != null) { + metSelectors.put(metric, metricSelector); + } + } + + return Sequences.simple( + new Iterable() + { + @Override + public Iterator iterator() + { + return new Iterator() + { + @Override + public boolean hasNext() + { + return !cursor.isDone(); + } + + @Override + public InputRow next() + { + final Map theEvent = Maps.newLinkedHashMap(); + final long timestamp = timestampColumnSelector.get(); + theEvent.put(EventHolder.timestampKey, new DateTime(timestamp)); + + for (Map.Entry dimSelector : dimSelectors.entrySet()) { + final String dim = dimSelector.getKey(); + final DimensionSelector selector = dimSelector.getValue(); + final IndexedInts vals = selector.getRow(); + + if (vals.size() == 1) { + final String dimVal = selector.lookupName(vals.get(0)); + theEvent.put(dim, dimVal); + } else { + List dimVals = Lists.newArrayList(); + for (int i = 0; i < vals.size(); ++i) { + dimVals.add(selector.lookupName(vals.get(i))); + } + theEvent.put(dim, dimVals); + } + } + + for (Map.Entry metSelector : metSelectors.entrySet()) { + final String metric = metSelector.getKey(); + final ObjectColumnSelector selector = metSelector.getValue(); + theEvent.put(metric, selector.get()); + } + cursor.advance(); + return new MapBasedInputRow(timestamp, dims, theEvent); + } + + @Override + public void remove() + { + throw new UnsupportedOperationException("Remove Not Supported"); + } + }; + } + } + ); + } + } + ) + ); + } + } + ) + ); + rowYielder = rows.toYielder( + null, + new YieldingAccumulator() + { + @Override + public InputRow accumulate(InputRow accumulated, InputRow in) + { + yield(); + return in; + } + } + ); + } + + @Override + public boolean hasMore() + { + return !rowYielder.isDone(); + } + + @Override + public InputRow nextRow() + { + final InputRow inputRow = rowYielder.get(); + rowYielder = rowYielder.next(null); + return inputRow; + } + + @Override + public Runnable commit() + { + return Runnables.getNoopRunnable(); + } + + @Override + public void close() throws IOException + { + rowYielder.close(); + } +}