From c216eb73407bc6b5534f93825d41ce69542e4095 Mon Sep 17 00:00:00 2001 From: nishantmonu51 Date: Thu, 21 Aug 2014 23:56:07 +0530 Subject: [PATCH] remove ThreadSafeAggregator --- .../segment/incremental/IncrementalIndex.java | 269 ++++++++---------- 1 file changed, 111 insertions(+), 158 deletions(-) diff --git a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java index 124035e612f..da9fe613f42 100644 --- a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java +++ b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java @@ -80,9 +80,7 @@ public class IncrementalIndex implements Iterable, Closeable { private final long minTimestamp; private final QueryGranularity gran; - private final List> rowTransformers; - private final AggregatorFactory[] metrics; private final Map metricIndexes; private final Map metricTypes; @@ -93,9 +91,7 @@ public class IncrementalIndex implements Iterable, Closeable private final LinkedHashMap dimensionOrder; private final CopyOnWriteArrayList dimensions; private final DimensionHolder dimValues; - private final Map columnCapabilities; - private final ConcurrentSkipListMap facts; private final ResourceHolder bufferHolder; private volatile AtomicInteger numEntries = new AtomicInteger(); @@ -130,148 +126,146 @@ public class IncrementalIndex implements Iterable, Closeable int currAggSize = 0; for (int i = 0; i < metrics.length; i++) { final AggregatorFactory agg = metrics[i]; - aggs[i] = new ThreadSafeAggregator( - agg.factorizeBuffered( - new ColumnSelectorFactory() + aggs[i] = agg.factorizeBuffered( + new ColumnSelectorFactory() + { + @Override + public TimestampColumnSelector makeTimestampColumnSelector() + { + return new TimestampColumnSelector() { @Override - public TimestampColumnSelector makeTimestampColumnSelector() + public long getTimestamp() { - return new TimestampColumnSelector() - { - @Override - public long getTimestamp() - { - return in.get().getTimestampFromEpoch(); - } - }; + return in.get().getTimestampFromEpoch(); + } + }; + } + + @Override + public FloatColumnSelector makeFloatColumnSelector(String columnName) + { + final String metricName = columnName.toLowerCase(); + return new FloatColumnSelector() + { + @Override + public float get() + { + return in.get().getFloatMetric(metricName); + } + }; + } + + @Override + public ObjectColumnSelector makeObjectColumnSelector(String column) + { + final String typeName = agg.getTypeName(); + final String columnName = column.toLowerCase(); + + final ObjectColumnSelector rawColumnSelector = new ObjectColumnSelector() + { + @Override + public Class classOfObject() + { + return Object.class; } @Override - public FloatColumnSelector makeFloatColumnSelector(String columnName) + public Object get() { - final String metricName = columnName.toLowerCase(); - return new FloatColumnSelector() - { - @Override - public float get() - { - return in.get().getFloatMetric(metricName); - } - }; + return in.get().getRaw(columnName); + } + }; + + if (!deserializeComplexMetrics) { + return rawColumnSelector; + } else { + if (typeName.equals("float")) { + return rawColumnSelector; } - @Override - public ObjectColumnSelector makeObjectColumnSelector(String column) + final ComplexMetricSerde serde = ComplexMetrics.getSerdeForType(typeName); + if (serde == null) { + throw new ISE("Don't know how to handle type[%s]", typeName); + } + + final ComplexMetricExtractor extractor = serde.getExtractor(); + return new ObjectColumnSelector() { - final String typeName = agg.getTypeName(); - final String columnName = column.toLowerCase(); - - final ObjectColumnSelector rawColumnSelector = new ObjectColumnSelector() + @Override + public Class classOfObject() { - @Override - public Class classOfObject() - { - return Object.class; - } - - @Override - public Object get() - { - return in.get().getRaw(columnName); - } - }; - - if (!deserializeComplexMetrics) { - return rawColumnSelector; - } else { - if (typeName.equals("float")) { - return rawColumnSelector; - } - - final ComplexMetricSerde serde = ComplexMetrics.getSerdeForType(typeName); - if (serde == null) { - throw new ISE("Don't know how to handle type[%s]", typeName); - } - - final ComplexMetricExtractor extractor = serde.getExtractor(); - return new ObjectColumnSelector() - { - @Override - public Class classOfObject() - { - return extractor.extractedClass(); - } - - @Override - public Object get() - { - return extractor.extractValue(in.get(), columnName); - } - }; + return extractor.extractedClass(); } - } + @Override + public Object get() + { + return extractor.extractValue(in.get(), columnName); + } + }; + } + } + + @Override + public DimensionSelector makeDimensionSelector(final String dimension) + { + final String dimensionName = dimension.toLowerCase(); + return new DimensionSelector() + { @Override - public DimensionSelector makeDimensionSelector(final String dimension) + public IndexedInts getRow() { - final String dimensionName = dimension.toLowerCase(); - return new DimensionSelector() + final List dimensionValues = in.get().getDimension(dimensionName); + final ArrayList vals = Lists.newArrayList(); + if (dimensionValues != null) { + for (int i = 0; i < dimensionValues.size(); ++i) { + vals.add(i); + } + } + + return new IndexedInts() { @Override - public IndexedInts getRow() + public int size() { - final List dimensionValues = in.get().getDimension(dimensionName); - final ArrayList vals = Lists.newArrayList(); - if (dimensionValues != null) { - for (int i = 0; i < dimensionValues.size(); ++i) { - vals.add(i); - } - } - - return new IndexedInts() - { - @Override - public int size() - { - return vals.size(); - } - - @Override - public int get(int index) - { - return vals.get(index); - } - - @Override - public Iterator iterator() - { - return vals.iterator(); - } - }; + return vals.size(); } @Override - public int getValueCardinality() + public int get(int index) { - throw new UnsupportedOperationException("value cardinality is unknown in incremental index"); + return vals.get(index); } @Override - public String lookupName(int id) + public Iterator iterator() { - return in.get().getDimension(dimensionName).get(id); - } - - @Override - public int lookupId(String name) - { - return in.get().getDimension(dimensionName).indexOf(name); + return vals.iterator(); } }; } - } - ) + + @Override + public int getValueCardinality() + { + throw new UnsupportedOperationException("value cardinality is unknown in incremental index"); + } + + @Override + public String lookupName(int id) + { + return in.get().getDimension(dimensionName).get(id); + } + + @Override + public int lookupId(String name) + { + return in.get().getDimension(dimensionName).indexOf(name); + } + }; + } + } ); aggPositionOffsets[i] = currAggSize; currAggSize += agg.getMaxIntermediateSize(); @@ -861,45 +855,4 @@ public class IncrementalIndex implements Iterable, Closeable } } } - - private static class ThreadSafeAggregator implements BufferAggregator - { - - private final BufferAggregator delegate; - - public ThreadSafeAggregator(BufferAggregator delegate) - { - this.delegate = delegate; - } - - @Override - public synchronized void init(ByteBuffer buf, int position) - { - delegate.init(buf, position); - } - - @Override - public synchronized void aggregate(ByteBuffer buf, int position) - { - delegate.aggregate(buf, position); - } - - @Override - public synchronized Object get(ByteBuffer buf, int position) - { - return delegate.get(buf, position); - } - - @Override - public synchronized float getFloat(ByteBuffer buf, int position) - { - return delegate.getFloat(buf, position); - } - - @Override - public synchronized void close() - { - delegate.close(); - } - } }