From 972c5dac31b68fc321cb2a408a516daadb6b9095 Mon Sep 17 00:00:00 2001 From: nishantmonu51 Date: Mon, 14 Jul 2014 21:17:53 +0530 Subject: [PATCH] improve memory usage and rename firehose --- .../IngestSegmentFirehoseFactory.java | 97 ++++++++++++------- 1 file changed, 61 insertions(+), 36 deletions(-) diff --git a/indexing-service/src/main/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactory.java b/indexing-service/src/main/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactory.java index 2d270ccdeb1..85f1c0b4c59 100644 --- a/indexing-service/src/main/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactory.java +++ b/indexing-service/src/main/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactory.java @@ -71,6 +71,7 @@ import javax.annotation.Nullable; import java.io.File; import java.io.IOException; import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; @@ -138,7 +139,7 @@ public class IngestSegmentFirehoseFactory implements FirehoseFactory rowYielder; - public DruidFirehose(List adapters, final List dims, final List metrics) + public IngestSegmentFirehose(List adapters, final List dims, final List metrics) { Sequence rows = Sequences.concat( Iterables.transform( @@ -273,54 +274,78 @@ public class IngestSegmentFirehoseFactory implements FirehoseFactory apply(@Nullable Cursor cursor) + public Sequence apply(@Nullable final Cursor cursor) { - TimestampColumnSelector timestampColumnSelector = cursor.makeTimestampColumnSelector(); + final TimestampColumnSelector timestampColumnSelector = cursor.makeTimestampColumnSelector(); - Map dimSelectors = Maps.newHashMap(); + final Map dimSelectors = Maps.newHashMap(); for (String dim : dims) { final DimensionSelector dimSelector = cursor.makeDimensionSelector(dim); dimSelectors.put(dim, dimSelector); } - Map metSelectors = Maps.newHashMap(); + final Map metSelectors = Maps.newHashMap(); for (String metric : metrics) { final ObjectColumnSelector metricSelector = cursor.makeObjectColumnSelector(metric); metSelectors.put(metric, metricSelector); } - List rowList = Lists.newArrayList(); - while (!cursor.isDone()) { - final Map theEvent = Maps.newLinkedHashMap(); - final long timestamp = timestampColumnSelector.getTimestamp(); - theEvent.put(EventHolder.timestampKey, new DateTime(timestamp)); + return Sequences.simple( + new Iterable() + { + @Override + public Iterator iterator() + { + return new Iterator() + { + @Override + public boolean hasNext() + { + return !cursor.isDone(); + } - for (Map.Entry dimSelector : dimSelectors.entrySet()) { - final String dim = dimSelector.getKey(); - final DimensionSelector selector = dimSelector.getValue(); - final IndexedInts vals = selector.getRow(); + @Override + public InputRow next() + { + final Map theEvent = Maps.newLinkedHashMap(); + final long timestamp = timestampColumnSelector.getTimestamp(); + theEvent.put(EventHolder.timestampKey, new DateTime(timestamp)); - if (vals.size() == 1) { - final String dimVal = selector.lookupName(vals.get(0)); - theEvent.put(dim, dimVal); - } else { - List dimVals = Lists.newArrayList(); - for (int i = 0; i < vals.size(); ++i) { - dimVals.add(selector.lookupName(vals.get(i))); + for (Map.Entry dimSelector : dimSelectors.entrySet()) { + final String dim = dimSelector.getKey(); + final DimensionSelector selector = dimSelector.getValue(); + final IndexedInts vals = selector.getRow(); + + if (vals.size() == 1) { + final String dimVal = selector.lookupName(vals.get(0)); + theEvent.put(dim, dimVal); + } else { + List dimVals = Lists.newArrayList(); + for (int i = 0; i < vals.size(); ++i) { + dimVals.add(selector.lookupName(vals.get(i))); + } + theEvent.put(dim, dimVals); + } + } + + for (Map.Entry metSelector : metSelectors.entrySet()) { + final String metric = metSelector.getKey(); + final ObjectColumnSelector selector = metSelector.getValue(); + theEvent.put(metric, selector.get()); + } + cursor.advance(); + return new MapBasedInputRow(timestamp, dims, theEvent); + } + + @Override + public void remove() + { + throw new UnsupportedOperationException("Remove Not Supported"); + } + }; } - theEvent.put(dim, dimVals); } - } - - for (Map.Entry metSelector : metSelectors.entrySet()) { - final String metric = metSelector.getKey(); - final ObjectColumnSelector selector = metSelector.getValue(); - theEvent.put(metric, selector.get()); - } - rowList.add(new MapBasedInputRow(timestamp, dims, theEvent)); - cursor.advance(); - } - return Sequences.simple(rowList); + ); } } )