improve memory usage and rename firehose

This commit is contained in:
nishantmonu51 2014-07-14 21:17:53 +05:30
parent 7168adcca7
commit 972c5dac31
1 changed files with 61 additions and 36 deletions

View File

@ -71,6 +71,7 @@ import javax.annotation.Nullable;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.util.HashSet; import java.util.HashSet;
import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
@ -138,7 +139,7 @@ public class IngestSegmentFirehoseFactory implements FirehoseFactory<InputRowPar
@Override @Override
public Firehose connect(InputRowParser inputRowParser) throws IOException, ParseException public Firehose connect(InputRowParser inputRowParser) throws IOException, ParseException
{ {
log.info("Connecting firehose: DruidFirehose[%s,%s]", dataSource, interval); log.info("Connecting firehose: IngestSegmentFirehose[%s,%s]", dataSource, interval);
// better way to achieve this is to pass toolbox to Firehose, The instance is initialized Lazily on connect method. // better way to achieve this is to pass toolbox to Firehose, The instance is initialized Lazily on connect method.
final TaskToolbox toolbox = injector.getInstance(TaskToolboxFactory.class).build( final TaskToolbox toolbox = injector.getInstance(TaskToolboxFactory.class).build(
new IngestTask("Ingest-Task-Id", dataSource) new IngestTask("Ingest-Task-Id", dataSource)
@ -204,7 +205,7 @@ public class IngestSegmentFirehoseFactory implements FirehoseFactory<InputRowPar
} }
); );
return new DruidFirehose(adapters, dims, metricsList); return new IngestSegmentFirehose(adapters, dims, metricsList);
} }
catch (IOException e) { catch (IOException e) {
@ -249,11 +250,11 @@ public class IngestSegmentFirehoseFactory implements FirehoseFactory<InputRowPar
} }
} }
public class DruidFirehose implements Firehose public class IngestSegmentFirehose implements Firehose
{ {
private volatile Yielder<InputRow> rowYielder; private volatile Yielder<InputRow> rowYielder;
public DruidFirehose(List<StorageAdapter> adapters, final List<String> dims, final List<String> metrics) public IngestSegmentFirehose(List<StorageAdapter> adapters, final List<String> dims, final List<String> metrics)
{ {
Sequence<InputRow> rows = Sequences.concat( Sequence<InputRow> rows = Sequences.concat(
Iterables.transform( Iterables.transform(
@ -273,54 +274,78 @@ public class IngestSegmentFirehoseFactory implements FirehoseFactory<InputRowPar
{ {
@Nullable @Nullable
@Override @Override
public Sequence<InputRow> apply(@Nullable Cursor cursor) public Sequence<InputRow> apply(@Nullable final Cursor cursor)
{ {
TimestampColumnSelector timestampColumnSelector = cursor.makeTimestampColumnSelector(); final TimestampColumnSelector timestampColumnSelector = cursor.makeTimestampColumnSelector();
Map<String, DimensionSelector> dimSelectors = Maps.newHashMap(); final Map<String, DimensionSelector> dimSelectors = Maps.newHashMap();
for (String dim : dims) { for (String dim : dims) {
final DimensionSelector dimSelector = cursor.makeDimensionSelector(dim); final DimensionSelector dimSelector = cursor.makeDimensionSelector(dim);
dimSelectors.put(dim, dimSelector); dimSelectors.put(dim, dimSelector);
} }
Map<String, ObjectColumnSelector> metSelectors = Maps.newHashMap(); final Map<String, ObjectColumnSelector> metSelectors = Maps.newHashMap();
for (String metric : metrics) { for (String metric : metrics) {
final ObjectColumnSelector metricSelector = cursor.makeObjectColumnSelector(metric); final ObjectColumnSelector metricSelector = cursor.makeObjectColumnSelector(metric);
metSelectors.put(metric, metricSelector); metSelectors.put(metric, metricSelector);
} }
List<InputRow> rowList = Lists.newArrayList(); return Sequences.simple(
while (!cursor.isDone()) { new Iterable<InputRow>()
final Map<String, Object> theEvent = Maps.newLinkedHashMap(); {
final long timestamp = timestampColumnSelector.getTimestamp(); @Override
theEvent.put(EventHolder.timestampKey, new DateTime(timestamp)); public Iterator<InputRow> iterator()
{
return new Iterator<InputRow>()
{
@Override
public boolean hasNext()
{
return !cursor.isDone();
}
for (Map.Entry<String, DimensionSelector> dimSelector : dimSelectors.entrySet()) { @Override
final String dim = dimSelector.getKey(); public InputRow next()
final DimensionSelector selector = dimSelector.getValue(); {
final IndexedInts vals = selector.getRow(); final Map<String, Object> theEvent = Maps.newLinkedHashMap();
final long timestamp = timestampColumnSelector.getTimestamp();
theEvent.put(EventHolder.timestampKey, new DateTime(timestamp));
if (vals.size() == 1) { for (Map.Entry<String, DimensionSelector> dimSelector : dimSelectors.entrySet()) {
final String dimVal = selector.lookupName(vals.get(0)); final String dim = dimSelector.getKey();
theEvent.put(dim, dimVal); final DimensionSelector selector = dimSelector.getValue();
} else { final IndexedInts vals = selector.getRow();
List<String> dimVals = Lists.newArrayList();
for (int i = 0; i < vals.size(); ++i) { if (vals.size() == 1) {
dimVals.add(selector.lookupName(vals.get(i))); final String dimVal = selector.lookupName(vals.get(0));
theEvent.put(dim, dimVal);
} else {
List<String> dimVals = Lists.newArrayList();
for (int i = 0; i < vals.size(); ++i) {
dimVals.add(selector.lookupName(vals.get(i)));
}
theEvent.put(dim, dimVals);
}
}
for (Map.Entry<String, ObjectColumnSelector> metSelector : metSelectors.entrySet()) {
final String metric = metSelector.getKey();
final ObjectColumnSelector selector = metSelector.getValue();
theEvent.put(metric, selector.get());
}
cursor.advance();
return new MapBasedInputRow(timestamp, dims, theEvent);
}
@Override
public void remove()
{
throw new UnsupportedOperationException("Remove Not Supported");
}
};
} }
theEvent.put(dim, dimVals);
} }
} );
for (Map.Entry<String, ObjectColumnSelector> metSelector : metSelectors.entrySet()) {
final String metric = metSelector.getKey();
final ObjectColumnSelector selector = metSelector.getValue();
theEvent.put(metric, selector.get());
}
rowList.add(new MapBasedInputRow(timestamp, dims, theEvent));
cursor.advance();
}
return Sequences.simple(rowList);
} }
} }
) )