remove ThreadSafeAggregator

This commit is contained in:
nishantmonu51 2014-08-21 23:56:07 +05:30
parent 67f4bbae74
commit c216eb7340
1 changed files with 111 additions and 158 deletions

View File

@ -80,9 +80,7 @@ public class IncrementalIndex implements Iterable<Row>, Closeable
{ {
private final long minTimestamp; private final long minTimestamp;
private final QueryGranularity gran; private final QueryGranularity gran;
private final List<Function<InputRow, InputRow>> rowTransformers; private final List<Function<InputRow, InputRow>> rowTransformers;
private final AggregatorFactory[] metrics; private final AggregatorFactory[] metrics;
private final Map<String, Integer> metricIndexes; private final Map<String, Integer> metricIndexes;
private final Map<String, String> metricTypes; private final Map<String, String> metricTypes;
@ -93,9 +91,7 @@ public class IncrementalIndex implements Iterable<Row>, Closeable
private final LinkedHashMap<String, Integer> dimensionOrder; private final LinkedHashMap<String, Integer> dimensionOrder;
private final CopyOnWriteArrayList<String> dimensions; private final CopyOnWriteArrayList<String> dimensions;
private final DimensionHolder dimValues; private final DimensionHolder dimValues;
private final Map<String, ColumnCapabilitiesImpl> columnCapabilities; private final Map<String, ColumnCapabilitiesImpl> columnCapabilities;
private final ConcurrentSkipListMap<TimeAndDims, Integer> facts; private final ConcurrentSkipListMap<TimeAndDims, Integer> facts;
private final ResourceHolder<ByteBuffer> bufferHolder; private final ResourceHolder<ByteBuffer> bufferHolder;
private volatile AtomicInteger numEntries = new AtomicInteger(); private volatile AtomicInteger numEntries = new AtomicInteger();
@ -130,148 +126,146 @@ public class IncrementalIndex implements Iterable<Row>, Closeable
int currAggSize = 0; int currAggSize = 0;
for (int i = 0; i < metrics.length; i++) { for (int i = 0; i < metrics.length; i++) {
final AggregatorFactory agg = metrics[i]; final AggregatorFactory agg = metrics[i];
aggs[i] = new ThreadSafeAggregator( aggs[i] = agg.factorizeBuffered(
agg.factorizeBuffered( new ColumnSelectorFactory()
new ColumnSelectorFactory() {
@Override
public TimestampColumnSelector makeTimestampColumnSelector()
{
return new TimestampColumnSelector()
{ {
@Override @Override
public TimestampColumnSelector makeTimestampColumnSelector() public long getTimestamp()
{ {
return new TimestampColumnSelector() return in.get().getTimestampFromEpoch();
{ }
@Override };
public long getTimestamp() }
{
return in.get().getTimestampFromEpoch(); @Override
} public FloatColumnSelector makeFloatColumnSelector(String columnName)
}; {
final String metricName = columnName.toLowerCase();
return new FloatColumnSelector()
{
@Override
public float get()
{
return in.get().getFloatMetric(metricName);
}
};
}
@Override
public ObjectColumnSelector makeObjectColumnSelector(String column)
{
final String typeName = agg.getTypeName();
final String columnName = column.toLowerCase();
final ObjectColumnSelector<Object> rawColumnSelector = new ObjectColumnSelector<Object>()
{
@Override
public Class classOfObject()
{
return Object.class;
} }
@Override @Override
public FloatColumnSelector makeFloatColumnSelector(String columnName) public Object get()
{ {
final String metricName = columnName.toLowerCase(); return in.get().getRaw(columnName);
return new FloatColumnSelector() }
{ };
@Override
public float get() if (!deserializeComplexMetrics) {
{ return rawColumnSelector;
return in.get().getFloatMetric(metricName); } else {
} if (typeName.equals("float")) {
}; return rawColumnSelector;
} }
@Override final ComplexMetricSerde serde = ComplexMetrics.getSerdeForType(typeName);
public ObjectColumnSelector makeObjectColumnSelector(String column) if (serde == null) {
throw new ISE("Don't know how to handle type[%s]", typeName);
}
final ComplexMetricExtractor extractor = serde.getExtractor();
return new ObjectColumnSelector()
{ {
final String typeName = agg.getTypeName(); @Override
final String columnName = column.toLowerCase(); public Class classOfObject()
final ObjectColumnSelector<Object> rawColumnSelector = new ObjectColumnSelector<Object>()
{ {
@Override return extractor.extractedClass();
public Class classOfObject()
{
return Object.class;
}
@Override
public Object get()
{
return in.get().getRaw(columnName);
}
};
if (!deserializeComplexMetrics) {
return rawColumnSelector;
} else {
if (typeName.equals("float")) {
return rawColumnSelector;
}
final ComplexMetricSerde serde = ComplexMetrics.getSerdeForType(typeName);
if (serde == null) {
throw new ISE("Don't know how to handle type[%s]", typeName);
}
final ComplexMetricExtractor extractor = serde.getExtractor();
return new ObjectColumnSelector()
{
@Override
public Class classOfObject()
{
return extractor.extractedClass();
}
@Override
public Object get()
{
return extractor.extractValue(in.get(), columnName);
}
};
} }
}
@Override
public Object get()
{
return extractor.extractValue(in.get(), columnName);
}
};
}
}
@Override
public DimensionSelector makeDimensionSelector(final String dimension)
{
final String dimensionName = dimension.toLowerCase();
return new DimensionSelector()
{
@Override @Override
public DimensionSelector makeDimensionSelector(final String dimension) public IndexedInts getRow()
{ {
final String dimensionName = dimension.toLowerCase(); final List<String> dimensionValues = in.get().getDimension(dimensionName);
return new DimensionSelector() final ArrayList<Integer> vals = Lists.newArrayList();
if (dimensionValues != null) {
for (int i = 0; i < dimensionValues.size(); ++i) {
vals.add(i);
}
}
return new IndexedInts()
{ {
@Override @Override
public IndexedInts getRow() public int size()
{ {
final List<String> dimensionValues = in.get().getDimension(dimensionName); return vals.size();
final ArrayList<Integer> vals = Lists.newArrayList();
if (dimensionValues != null) {
for (int i = 0; i < dimensionValues.size(); ++i) {
vals.add(i);
}
}
return new IndexedInts()
{
@Override
public int size()
{
return vals.size();
}
@Override
public int get(int index)
{
return vals.get(index);
}
@Override
public Iterator<Integer> iterator()
{
return vals.iterator();
}
};
} }
@Override @Override
public int getValueCardinality() public int get(int index)
{ {
throw new UnsupportedOperationException("value cardinality is unknown in incremental index"); return vals.get(index);
} }
@Override @Override
public String lookupName(int id) public Iterator<Integer> iterator()
{ {
return in.get().getDimension(dimensionName).get(id); return vals.iterator();
}
@Override
public int lookupId(String name)
{
return in.get().getDimension(dimensionName).indexOf(name);
} }
}; };
} }
}
) @Override
public int getValueCardinality()
{
throw new UnsupportedOperationException("value cardinality is unknown in incremental index");
}
@Override
public String lookupName(int id)
{
return in.get().getDimension(dimensionName).get(id);
}
@Override
public int lookupId(String name)
{
return in.get().getDimension(dimensionName).indexOf(name);
}
};
}
}
); );
aggPositionOffsets[i] = currAggSize; aggPositionOffsets[i] = currAggSize;
currAggSize += agg.getMaxIntermediateSize(); currAggSize += agg.getMaxIntermediateSize();
@ -861,45 +855,4 @@ public class IncrementalIndex implements Iterable<Row>, Closeable
} }
} }
} }
private static class ThreadSafeAggregator implements BufferAggregator
{
private final BufferAggregator delegate;
public ThreadSafeAggregator(BufferAggregator delegate)
{
this.delegate = delegate;
}
@Override
public synchronized void init(ByteBuffer buf, int position)
{
delegate.init(buf, position);
}
@Override
public synchronized void aggregate(ByteBuffer buf, int position)
{
delegate.aggregate(buf, position);
}
@Override
public synchronized Object get(ByteBuffer buf, int position)
{
return delegate.get(buf, position);
}
@Override
public synchronized float getFloat(ByteBuffer buf, int position)
{
return delegate.getFloat(buf, position);
}
@Override
public synchronized void close()
{
delegate.close();
}
}
} }