a whole bunch of cleanup and fixes

This commit is contained in:
fjy 2014-12-02 17:32:05 -08:00
parent b65933ffb8
commit bc173d14fc
8 changed files with 698 additions and 1120 deletions

View File

@ -20,71 +20,599 @@
package io.druid.segment.incremental; package io.druid.segment.incremental;
import com.google.common.base.Function; import com.google.common.base.Function;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.primitives.Ints; import com.google.common.primitives.Ints;
import com.google.common.primitives.Longs; import com.google.common.primitives.Longs;
import com.metamx.common.IAE;
import com.metamx.common.ISE;
import io.druid.data.input.InputRow; import io.druid.data.input.InputRow;
import io.druid.data.input.MapBasedRow;
import io.druid.data.input.Row; import io.druid.data.input.Row;
import io.druid.data.input.impl.SpatialDimensionSchema;
import io.druid.granularity.QueryGranularity;
import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.PostAggregator; import io.druid.query.aggregation.PostAggregator;
import io.druid.segment.ColumnSelectorFactory;
import io.druid.segment.DimensionSelector;
import io.druid.segment.FloatColumnSelector;
import io.druid.segment.LongColumnSelector;
import io.druid.segment.ObjectColumnSelector;
import io.druid.segment.column.Column;
import io.druid.segment.column.ColumnCapabilities; import io.druid.segment.column.ColumnCapabilities;
import io.druid.segment.column.ColumnCapabilitiesImpl;
import io.druid.segment.column.ValueType;
import io.druid.segment.data.IndexedInts;
import io.druid.segment.serde.ComplexMetricExtractor;
import io.druid.segment.serde.ComplexMetricSerde;
import io.druid.segment.serde.ComplexMetrics;
import org.joda.time.DateTime; import org.joda.time.DateTime;
import org.joda.time.Interval; import org.joda.time.Interval;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import java.io.Closeable; import java.io.Closeable;
import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentNavigableMap; import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicInteger;
/** /**
*/ */
public interface IncrementalIndex extends Iterable<Row>, Closeable public abstract class IncrementalIndex<AggregatorType> implements Iterable<Row>, Closeable
{ {
protected static ColumnSelectorFactory makeColumnSelectorFactory(
final AggregatorFactory agg,
final ThreadLocal<InputRow> in,
final boolean deserializeComplexMetrics
)
{
return new ColumnSelectorFactory()
{
@Override
public LongColumnSelector makeLongColumnSelector(final String columnName)
{
if (columnName.equals(Column.TIME_COLUMN_NAME)) {
return new LongColumnSelector()
{
@Override
public long get()
{
return in.get().getTimestampFromEpoch();
}
};
}
return new LongColumnSelector()
{
@Override
public long get()
{
return in.get().getLongMetric(columnName);
}
};
}
List<String> getDimensions(); @Override
public FloatColumnSelector makeFloatColumnSelector(final String columnName)
{
return new FloatColumnSelector()
{
@Override
public float get()
{
return in.get().getFloatMetric(columnName);
}
};
}
ConcurrentNavigableMap<TimeAndDims, Integer> getFacts(); @Override
public ObjectColumnSelector makeObjectColumnSelector(final String column)
{
final String typeName = agg.getTypeName();
Integer getDimensionIndex(String dimension); final ObjectColumnSelector<Object> rawColumnSelector = new ObjectColumnSelector<Object>()
{
@Override
public Class classOfObject()
{
return Object.class;
}
List<String> getMetricNames(); @Override
public Object get()
{
return in.get().getRaw(column);
}
};
DimDim getDimension(String dimension); if (!deserializeComplexMetrics) {
return rawColumnSelector;
} else {
if (typeName.equals("float")) {
return rawColumnSelector;
}
AggregatorFactory[] getMetricAggs(); final ComplexMetricSerde serde = ComplexMetrics.getSerdeForType(typeName);
if (serde == null) {
throw new ISE("Don't know how to handle type[%s]", typeName);
}
Interval getInterval(); final ComplexMetricExtractor extractor = serde.getExtractor();
return new ObjectColumnSelector()
{
@Override
public Class classOfObject()
{
return extractor.extractedClass();
}
DateTime getMinTime(); @Override
public Object get()
{
return extractor.extractValue(in.get(), column);
}
};
}
}
DateTime getMaxTime(); @Override
public DimensionSelector makeDimensionSelector(final String dimension)
{
return new DimensionSelector()
{
@Override
public IndexedInts getRow()
{
final List<String> dimensionValues = in.get().getDimension(dimension);
final ArrayList<Integer> vals = Lists.newArrayList();
if (dimensionValues != null) {
for (int i = 0; i < dimensionValues.size(); ++i) {
vals.add(i);
}
}
boolean isEmpty(); return new IndexedInts()
{
@Override
public int size()
{
return vals.size();
}
ConcurrentNavigableMap<TimeAndDims, Integer> getSubMap(TimeAndDims start, TimeAndDims end); @Override
public int get(int index)
{
return vals.get(index);
}
Integer getMetricIndex(String columnName); @Override
public Iterator<Integer> iterator()
{
return vals.iterator();
}
};
}
String getMetricType(String metric); @Override
public int getValueCardinality()
{
throw new UnsupportedOperationException("value cardinality is unknown in incremental index");
}
ColumnCapabilities getCapabilities(String column); @Override
public String lookupName(int id)
{
return in.get().getDimension(dimension).get(id);
}
int size(); @Override
public int lookupId(String name)
{
return in.get().getDimension(dimension).indexOf(name);
}
};
}
};
}
float getMetricFloatValue(int rowOffset, int aggOffset); private final long minTimestamp;
private final QueryGranularity gran;
private final List<Function<InputRow, InputRow>> rowTransformers;
private final AggregatorFactory[] metrics;
private final Map<String, Integer> metricIndexes;
private final Map<String, String> metricTypes;
private final ImmutableList<String> metricNames;
private final LinkedHashMap<String, Integer> dimensionOrder;
private final AggregatorType[] aggs;
private final DimensionHolder dimValues;
private final Map<String, ColumnCapabilitiesImpl> columnCapabilities;
private final boolean deserializeComplexMetrics;
long getMetricLongValue(int rowOffset, int aggOffset); protected final CopyOnWriteArrayList<String> dimensions;
Object getMetricObjectValue(int rowOffset, int aggOffset); private volatile AtomicInteger numEntries = new AtomicInteger();
Iterable<Row> iterableWithPostAggregations(List<PostAggregator> postAggregatorSpecs); // This is modified on add() in a critical section.
private ThreadLocal<InputRow> in = new ThreadLocal<>();
int add(InputRow inputRow); /**
* Setting deserializeComplexMetrics to false is necessary for intermediate aggregation such as groupBy that
* should not deserialize input columns using ComplexMetricSerde for aggregators that return complex metrics.
*
* @param incrementalIndexSchema the schema to use for incremental index
* @param deserializeComplexMetrics flag whether or not to call ComplexMetricExtractor.extractValue() on the input
* value for aggregators that return metrics other than float.
*/
public IncrementalIndex(
final IncrementalIndexSchema incrementalIndexSchema,
final boolean deserializeComplexMetrics
)
{
this.minTimestamp = incrementalIndexSchema.getMinTimestamp();
this.gran = incrementalIndexSchema.getGran();
this.metrics = incrementalIndexSchema.getMetrics();
this.rowTransformers = Lists.newCopyOnWriteArrayList();
this.deserializeComplexMetrics = deserializeComplexMetrics;
void close(); final ImmutableList.Builder<String> metricNamesBuilder = ImmutableList.builder();
final ImmutableMap.Builder<String, Integer> metricIndexesBuilder = ImmutableMap.builder();
final ImmutableMap.Builder<String, String> metricTypesBuilder = ImmutableMap.builder();
this.aggs = initAggs(metrics, in, deserializeComplexMetrics);
InputRow formatRow(InputRow parse); for (int i = 0; i < metrics.length; i++) {
final String metricName = metrics[i].getName();
metricNamesBuilder.add(metricName);
metricIndexesBuilder.put(metricName, i);
metricTypesBuilder.put(metricName, metrics[i].getTypeName());
}
metricNames = metricNamesBuilder.build();
metricIndexes = metricIndexesBuilder.build();
metricTypes = metricTypesBuilder.build();
this.dimensionOrder = Maps.newLinkedHashMap();
this.dimensions = new CopyOnWriteArrayList<>();
// This should really be more generic
List<SpatialDimensionSchema> spatialDimensions = incrementalIndexSchema.getDimensionsSpec().getSpatialDimensions();
if (!spatialDimensions.isEmpty()) {
this.rowTransformers.add(new SpatialDimensionRowTransformer(spatialDimensions));
}
this.columnCapabilities = Maps.newHashMap();
for (Map.Entry<String, String> entry : metricTypes.entrySet()) {
ValueType type;
if (entry.getValue().equalsIgnoreCase("float")) {
type = ValueType.FLOAT;
} else if (entry.getValue().equalsIgnoreCase("long")) {
type = ValueType.LONG;
} else {
type = ValueType.COMPLEX;
}
ColumnCapabilitiesImpl capabilities = new ColumnCapabilitiesImpl();
capabilities.setType(type);
columnCapabilities.put(entry.getKey(), capabilities);
}
for (String dimension : dimensions) {
ColumnCapabilitiesImpl capabilities = new ColumnCapabilitiesImpl();
capabilities.setType(ValueType.STRING);
columnCapabilities.put(dimension, capabilities);
}
for (SpatialDimensionSchema spatialDimension : spatialDimensions) {
ColumnCapabilitiesImpl capabilities = new ColumnCapabilitiesImpl();
capabilities.setType(ValueType.STRING);
capabilities.setHasSpatialIndexes(true);
columnCapabilities.put(spatialDimension.getDimName(), capabilities);
}
this.dimValues = new DimensionHolder();
}
public abstract ConcurrentNavigableMap<TimeAndDims, Integer> getFacts();
protected abstract DimDim makeDimDim(String dimension);
protected abstract AggregatorType[] initAggs(
AggregatorFactory[] metrics,
ThreadLocal<InputRow> in,
boolean deserializeComplexMetrics
);
protected abstract Integer addToFacts(
AggregatorFactory[] metrics,
boolean deserializeComplexMetrics,
InputRow row,
AtomicInteger numEntries,
TimeAndDims key,
ThreadLocal<InputRow> in
);
protected abstract AggregatorType[] getAggsForRow(int rowOffset);
protected abstract Object getAggVal(AggregatorType agg, int rowOffset, int aggPosition);
protected abstract float getMetricFloatValue(int rowOffset, int aggOffset);
protected abstract long getMetricLongValue(int rowOffset, int aggOffset);
protected abstract Object getMetricObjectValue(int rowOffset, int aggOffset);
@Override
public void close()
{
// Nothing to close
}
public InputRow formatRow(InputRow row)
{
for (Function<InputRow, InputRow> rowTransformer : rowTransformers) {
row = rowTransformer.apply(row);
}
if (row == null) {
throw new IAE("Row is null? How can this be?!");
}
return row;
}
/**
* Adds a new row. The row might correspond with another row that already exists, in which case this will
* update that row instead of inserting a new one.
* <p/>
* <p/>
* Calls to add() are thread safe.
* <p/>
*
* @param row the row of data to add
*
* @return the number of rows in the data set after adding the InputRow
*/
public int add(InputRow row)
{
row = formatRow(row);
if (row.getTimestampFromEpoch() < minTimestamp) {
throw new IAE("Cannot add row[%s] because it is below the minTimestamp[%s]", row, new DateTime(minTimestamp));
}
final List<String> rowDimensions = row.getDimensions();
String[][] dims;
List<String[]> overflow = null;
synchronized (dimensionOrder) {
dims = new String[dimensionOrder.size()][];
for (String dimension : rowDimensions) {
List<String> dimensionValues = row.getDimension(dimension);
// Set column capabilities as data is coming in
ColumnCapabilitiesImpl capabilities = columnCapabilities.get(dimension);
if (capabilities == null) {
capabilities = new ColumnCapabilitiesImpl();
capabilities.setType(ValueType.STRING);
columnCapabilities.put(dimension, capabilities);
}
if (dimensionValues.size() > 1) {
capabilities.setHasMultipleValues(true);
}
Integer index = dimensionOrder.get(dimension);
if (index == null) {
dimensionOrder.put(dimension, dimensionOrder.size());
dimensions.add(dimension);
if (overflow == null) {
overflow = Lists.newArrayList();
}
overflow.add(getDimVals(dimValues.add(dimension), dimensionValues));
} else {
dims[index] = getDimVals(dimValues.get(dimension), dimensionValues);
}
}
}
if (overflow != null) {
// Merge overflow and non-overflow
String[][] newDims = new String[dims.length + overflow.size()][];
System.arraycopy(dims, 0, newDims, 0, dims.length);
for (int i = 0; i < overflow.size(); ++i) {
newDims[dims.length + i] = overflow.get(i);
}
dims = newDims;
}
final TimeAndDims key = new TimeAndDims(Math.max(gran.truncate(row.getTimestampFromEpoch()), minTimestamp), dims);
return addToFacts(metrics, deserializeComplexMetrics, row, numEntries, key, in);
}
public boolean isEmpty()
{
return numEntries.get() == 0;
}
public int size()
{
return numEntries.get();
}
private long getMinTimeMillis()
{
return getFacts().firstKey().getTimestamp();
}
private long getMaxTimeMillis()
{
return getFacts().lastKey().getTimestamp();
}
private String[] getDimVals(final DimDim dimLookup, final List<String> dimValues)
{
final String[] retVal = new String[dimValues.size()];
int count = 0;
for (String dimValue : dimValues) {
String canonicalDimValue = dimLookup.get(dimValue);
if (!dimLookup.contains(canonicalDimValue)) {
dimLookup.add(dimValue);
}
retVal[count] = canonicalDimValue;
count++;
}
Arrays.sort(retVal);
return retVal;
}
public AggregatorType[] getAggs()
{
return aggs;
}
public AggregatorFactory[] getMetricAggs()
{
return metrics;
}
public DimensionHolder getDimValues()
{
return dimValues;
}
public List<String> getDimensions()
{
return dimensions;
}
public String getMetricType(String metric)
{
return metricTypes.get(metric);
}
public Interval getInterval()
{
return new Interval(minTimestamp, isEmpty() ? minTimestamp : gran.next(getMaxTimeMillis()));
}
public DateTime getMinTime()
{
return isEmpty() ? null : new DateTime(getMinTimeMillis());
}
public DateTime getMaxTime()
{
return isEmpty() ? null : new DateTime(getMaxTimeMillis());
}
public DimDim getDimension(String dimension)
{
return isEmpty() ? null : dimValues.get(dimension);
}
public Integer getDimensionIndex(String dimension)
{
return dimensionOrder.get(dimension);
}
public List<String> getMetricNames()
{
return metricNames;
}
public Integer getMetricIndex(String metricName)
{
return metricIndexes.get(metricName);
}
public ColumnCapabilities getCapabilities(String column)
{
return columnCapabilities.get(column);
}
public ConcurrentNavigableMap<TimeAndDims, Integer> getSubMap(TimeAndDims start, TimeAndDims end)
{
return getFacts().subMap(start, end);
}
@Override
public Iterator<Row> iterator()
{
return iterableWithPostAggregations(null).iterator();
}
public Iterable<Row> iterableWithPostAggregations(final List<PostAggregator> postAggs)
{
return new Iterable<Row>()
{
@Override
public Iterator<Row> iterator()
{
return Iterators.transform(
getFacts().entrySet().iterator(),
new Function<Map.Entry<TimeAndDims, Integer>, Row>()
{
@Override
public Row apply(final Map.Entry<TimeAndDims, Integer> input)
{
final TimeAndDims timeAndDims = input.getKey();
final int rowOffset = input.getValue();
String[][] theDims = timeAndDims.getDims();
Map<String, Object> theVals = Maps.newLinkedHashMap();
for (int i = 0; i < theDims.length; ++i) {
String[] dim = theDims[i];
if (dim != null && dim.length != 0) {
theVals.put(dimensions.get(i), dim.length == 1 ? dim[0] : Arrays.asList(dim));
}
}
AggregatorType[] aggs = getAggsForRow(rowOffset);
for (int i = 0; i < aggs.length; ++i) {
theVals.put(metrics[i].getName(), getAggVal(aggs[i], rowOffset, i));
}
if (postAggs != null) {
for (PostAggregator postAgg : postAggs) {
theVals.put(postAgg.getName(), postAgg.compute(theVals));
}
}
return new MapBasedRow(timeAndDims.getTimestamp(), theVals);
}
}
);
}
};
}
class DimensionHolder
{
private final Map<String, DimDim> dimensions;
DimensionHolder()
{
dimensions = Maps.newConcurrentMap();
}
DimDim add(String dimension)
{
DimDim holder = dimensions.get(dimension);
if (holder == null) {
holder = makeDimDim(dimension);
dimensions.put(dimension, holder);
} else {
throw new ISE("dimension[%s] already existed even though add() was called!?", dimension);
}
return holder;
}
DimDim get(String dimension)
{
return dimensions.get(dimension);
}
}
static interface DimDim static interface DimDim
{ {
@ -189,8 +717,7 @@ public interface IncrementalIndex extends Iterable<Row>, Closeable
return Arrays.asList(input); return Arrays.asList(input);
} }
} }
) + ) + '}';
'}';
} }
} }
} }

View File

@ -46,11 +46,11 @@ public class IncrementalIndexAdapter implements IndexableAdapter
{ {
private static final Logger log = new Logger(IncrementalIndexAdapter.class); private static final Logger log = new Logger(IncrementalIndexAdapter.class);
private final Interval dataInterval; private final Interval dataInterval;
private final IncrementalIndex index; private final IncrementalIndex<Object> index;
private final Map<String, Map<String, MutableBitmap>> invertedIndexes; private final Map<String, Map<String, MutableBitmap>> invertedIndexes;
public IncrementalIndexAdapter( public IncrementalIndexAdapter(
Interval dataInterval, IncrementalIndex index, BitmapFactory bitmapFactory Interval dataInterval, IncrementalIndex<Object> index, BitmapFactory bitmapFactory
) )
{ {
this.dataInterval = dataInterval; this.dataInterval = dataInterval;

View File

@ -19,41 +19,14 @@
package io.druid.segment.incremental; package io.druid.segment.incremental;
import com.google.common.base.Function;
import com.google.common.base.Throwables; import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.metamx.common.IAE;
import com.metamx.common.ISE; import com.metamx.common.ISE;
import io.druid.collections.ResourceHolder; import io.druid.collections.ResourceHolder;
import io.druid.collections.StupidPool; import io.druid.collections.StupidPool;
import io.druid.data.input.InputRow; import io.druid.data.input.InputRow;
import io.druid.data.input.MapBasedRow;
import io.druid.data.input.Row;
import io.druid.data.input.impl.SpatialDimensionSchema;
import io.druid.granularity.QueryGranularity; import io.druid.granularity.QueryGranularity;
import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.BufferAggregator; import io.druid.query.aggregation.BufferAggregator;
import io.druid.query.aggregation.PostAggregator;
import io.druid.segment.ColumnSelectorFactory;
import io.druid.segment.DimensionSelector;
import io.druid.segment.FloatColumnSelector;
import io.druid.segment.LongColumnSelector;
import io.druid.segment.ObjectColumnSelector;
import io.druid.segment.column.Column;
import io.druid.segment.column.ColumnCapabilities;
import io.druid.segment.column.ColumnCapabilitiesImpl;
import io.druid.segment.column.ValueType;
import io.druid.segment.data.IndexedInts;
import io.druid.segment.serde.ComplexMetricExtractor;
import io.druid.segment.serde.ComplexMetricSerde;
import io.druid.segment.serde.ComplexMetrics;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.mapdb.BTreeKeySerializer; import org.mapdb.BTreeKeySerializer;
import org.mapdb.DB; import org.mapdb.DB;
import org.mapdb.DBMaker; import org.mapdb.DBMaker;
@ -65,273 +38,53 @@ import java.io.IOException;
import java.io.Serializable; import java.io.Serializable;
import java.lang.ref.WeakReference; import java.lang.ref.WeakReference;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Comparator; import java.util.Comparator;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.UUID; import java.util.UUID;
import java.util.WeakHashMap; import java.util.WeakHashMap;
import java.util.concurrent.ConcurrentNavigableMap; import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
public class OffheapIncrementalIndex implements IncrementalIndex /**
*/
public class OffheapIncrementalIndex extends IncrementalIndex<BufferAggregator>
{ {
private final long minTimestamp;
private final QueryGranularity gran;
private final List<Function<InputRow, InputRow>> rowTransformers;
private final AggregatorFactory[] metrics;
private final Map<String, Integer> metricIndexes;
private final Map<String, String> metricTypes;
private final ImmutableList<String> metricNames;
private final BufferAggregator[] aggs;
private final int[] aggPositionOffsets;
private final int totalAggSize;
private final LinkedHashMap<String, Integer> dimensionOrder;
protected final CopyOnWriteArrayList<String> dimensions;
private final DimensionHolder dimValues;
private final Map<String, ColumnCapabilitiesImpl> columnCapabilities;
private final ConcurrentNavigableMap<TimeAndDims, Integer> facts;
private final ResourceHolder<ByteBuffer> bufferHolder; private final ResourceHolder<ByteBuffer> bufferHolder;
private final DB db; private final DB db;
private final DB factsDb; private final DB factsDb;
private volatile AtomicInteger numEntries = new AtomicInteger(); private final int[] aggPositionOffsets;
// This is modified on add() in a critical section. private final int totalAggSize;
private ThreadLocal<InputRow> in = new ThreadLocal<>(); private final ConcurrentNavigableMap<TimeAndDims, Integer> facts;
/**
* Setting deserializeComplexMetrics to false is necessary for intermediate aggregation such as groupBy that
* should not deserialize input columns using ComplexMetricSerde for aggregators that return complex metrics.
*
* @param incrementalIndexSchema
* @param bufferPool
* @param deserializeComplexMetrics flag whether or not to call ComplexMetricExtractor.extractValue() on the input
* value for aggregators that return metrics other than float.
*/
public OffheapIncrementalIndex( public OffheapIncrementalIndex(
IncrementalIndexSchema incrementalIndexSchema, IncrementalIndexSchema incrementalIndexSchema,
StupidPool<ByteBuffer> bufferPool, StupidPool<ByteBuffer> bufferPool,
final boolean deserializeComplexMetrics boolean deserializeComplexMetrics
) )
{ {
this.minTimestamp = incrementalIndexSchema.getMinTimestamp(); super(incrementalIndexSchema, deserializeComplexMetrics);
this.gran = incrementalIndexSchema.getGran();
this.metrics = incrementalIndexSchema.getMetrics();
this.rowTransformers = Lists.newCopyOnWriteArrayList();
final ImmutableList.Builder<String> metricNamesBuilder = ImmutableList.builder(); this.bufferHolder = bufferPool.take();
final ImmutableMap.Builder<String, Integer> metricIndexesBuilder = ImmutableMap.builder();
final ImmutableMap.Builder<String, String> metricTypesBuilder = ImmutableMap.builder(); final AggregatorFactory[] metrics = incrementalIndexSchema.getMetrics();
this.aggs = new BufferAggregator[metrics.length];
this.aggPositionOffsets = new int[metrics.length]; this.aggPositionOffsets = new int[metrics.length];
int currAggSize = 0; int currAggSize = 0;
for (int i = 0; i < metrics.length; i++) { for (int i = 0; i < metrics.length; i++) {
final AggregatorFactory agg = metrics[i]; final AggregatorFactory agg = metrics[i];
aggs[i] = agg.factorizeBuffered(
new ColumnSelectorFactory()
{
@Override
public LongColumnSelector makeLongColumnSelector(final String columnName)
{
if (columnName.equals(Column.TIME_COLUMN_NAME)) {
return new LongColumnSelector()
{
@Override
public long get()
{
return in.get().getTimestampFromEpoch();
}
};
}
return new LongColumnSelector()
{
@Override
public long get()
{
return in.get().getLongMetric(columnName);
}
};
}
@Override
public FloatColumnSelector makeFloatColumnSelector(final String columnName)
{
return new FloatColumnSelector()
{
@Override
public float get()
{
return in.get().getFloatMetric(columnName);
}
};
}
@Override
public ObjectColumnSelector makeObjectColumnSelector(final String column)
{
final String typeName = agg.getTypeName();
final ObjectColumnSelector<Object> rawColumnSelector = new ObjectColumnSelector<Object>()
{
@Override
public Class classOfObject()
{
return Object.class;
}
@Override
public Object get()
{
return in.get().getRaw(column);
}
};
if (!deserializeComplexMetrics) {
return rawColumnSelector;
} else {
if (typeName.equals("float")) {
return rawColumnSelector;
}
final ComplexMetricSerde serde = ComplexMetrics.getSerdeForType(typeName);
if (serde == null) {
throw new ISE("Don't know how to handle type[%s]", typeName);
}
final ComplexMetricExtractor extractor = serde.getExtractor();
return new ObjectColumnSelector()
{
@Override
public Class classOfObject()
{
return extractor.extractedClass();
}
@Override
public Object get()
{
return extractor.extractValue(in.get(), column);
}
};
}
}
@Override
public DimensionSelector makeDimensionSelector(final String dimension)
{
return new DimensionSelector()
{
@Override
public IndexedInts getRow()
{
final List<String> dimensionValues = in.get().getDimension(dimension);
final ArrayList<Integer> vals = Lists.newArrayList();
if (dimensionValues != null) {
for (int i = 0; i < dimensionValues.size(); ++i) {
vals.add(i);
}
}
return new IndexedInts()
{
@Override
public int size()
{
return vals.size();
}
@Override
public int get(int index)
{
return vals.get(index);
}
@Override
public Iterator<Integer> iterator()
{
return vals.iterator();
}
};
}
@Override
public int getValueCardinality()
{
throw new UnsupportedOperationException("value cardinality is unknown in incremental index");
}
@Override
public String lookupName(int id)
{
return in.get().getDimension(dimension).get(id);
}
@Override
public int lookupId(String name)
{
return in.get().getDimension(dimension).indexOf(name);
}
};
}
}
);
aggPositionOffsets[i] = currAggSize; aggPositionOffsets[i] = currAggSize;
currAggSize += agg.getMaxIntermediateSize(); currAggSize += agg.getMaxIntermediateSize();
final String metricName = metrics[i].getName();
metricNamesBuilder.add(metricName);
metricIndexesBuilder.put(metricName, i);
metricTypesBuilder.put(metricName, metrics[i].getTypeName());
} }
metricNames = metricNamesBuilder.build();
metricIndexes = metricIndexesBuilder.build();
metricTypes = metricTypesBuilder.build();
this.totalAggSize = currAggSize; this.totalAggSize = currAggSize;
this.dimensionOrder = Maps.newLinkedHashMap();
this.dimensions = new CopyOnWriteArrayList<>();
// This should really be more generic
List<SpatialDimensionSchema> spatialDimensions = incrementalIndexSchema.getDimensionsSpec().getSpatialDimensions();
if (!spatialDimensions.isEmpty()) {
this.rowTransformers.add(new SpatialDimensionRowTransformer(spatialDimensions));
}
this.columnCapabilities = Maps.newHashMap();
for (Map.Entry<String, String> entry : metricTypes.entrySet()) {
ValueType type;
if (entry.getValue().equalsIgnoreCase("float")) {
type = ValueType.FLOAT;
} else if (entry.getValue().equalsIgnoreCase("long")) {
type = ValueType.LONG;
} else {
type = ValueType.COMPLEX;
}
ColumnCapabilitiesImpl capabilities = new ColumnCapabilitiesImpl();
capabilities.setType(type);
columnCapabilities.put(entry.getKey(), capabilities);
}
for (String dimension : dimensions) {
ColumnCapabilitiesImpl capabilities = new ColumnCapabilitiesImpl();
capabilities.setType(ValueType.STRING);
columnCapabilities.put(dimension, capabilities);
}
for (SpatialDimensionSchema spatialDimension : spatialDimensions) {
ColumnCapabilitiesImpl capabilities = new ColumnCapabilitiesImpl();
capabilities.setType(ValueType.STRING);
capabilities.setHasSpatialIndexes(true);
columnCapabilities.put(spatialDimension.getDimName(), capabilities);
}
this.bufferHolder = bufferPool.take();
this.dimValues = new DimensionHolder();
final DBMaker dbMaker = DBMaker.newMemoryDirectDB() final DBMaker dbMaker = DBMaker.newMemoryDirectDB()
.transactionDisable() .transactionDisable()
.asyncWriteEnable() .asyncWriteEnable()
.cacheSoftRefEnable(); .cacheSoftRefEnable();
factsDb = dbMaker.make(); this.factsDb = dbMaker.make();
db = dbMaker.make(); this.db = dbMaker.make();
final TimeAndDimsSerializer timeAndDimsSerializer = new TimeAndDimsSerializer(this); final TimeAndDimsSerializer timeAndDimsSerializer = new TimeAndDimsSerializer(this);
this.facts = factsDb.createTreeMap("__facts" + UUID.randomUUID()) this.facts = factsDb.createTreeMap("__facts" + UUID.randomUUID())
.keySerializer(timeAndDimsSerializer) .keySerializer(timeAndDimsSerializer)
@ -340,7 +93,6 @@ public class OffheapIncrementalIndex implements IncrementalIndex
.make(); .make();
} }
public OffheapIncrementalIndex( public OffheapIncrementalIndex(
long minTimestamp, long minTimestamp,
QueryGranularity gran, QueryGranularity gran,
@ -360,85 +112,45 @@ public class OffheapIncrementalIndex implements IncrementalIndex
} }
@Override @Override
public InputRow formatRow(InputRow row) public ConcurrentNavigableMap<TimeAndDims, Integer> getFacts()
{ {
for (Function<InputRow, InputRow> rowTransformer : rowTransformers) { return facts;
row = rowTransformer.apply(row);
} }
if (row == null) {
throw new IAE("Row is null? How can this be?!");
}
return row;
}
/**
* Adds a new row. The row might correspond with another row that already exists, in which case this will
* update that row instead of inserting a new one.
* <p/>
* <p/>
* Calls to add() are thread safe.
* <p/>
*
* @param row the row of data to add
*
* @return the number of rows in the data set after adding the InputRow
*/
@Override @Override
public int add(InputRow row) protected DimDim makeDimDim(String dimension)
{ {
row = formatRow(row); return new OffheapDimDim(dimension);
if (row.getTimestampFromEpoch() < minTimestamp) {
throw new IAE("Cannot add row[%s] because it is below the minTimestamp[%s]", row, new DateTime(minTimestamp));
} }
final List<String> rowDimensions = row.getDimensions(); @Override
protected BufferAggregator[] initAggs(
String[][] dims; AggregatorFactory[] metrics,
List<String[]> overflow = null; ThreadLocal<InputRow> in,
synchronized (dimensionOrder) { boolean deserializeComplexMetrics
dims = new String[dimensionOrder.size()][]; )
for (String dimension : rowDimensions) { {
List<String> dimensionValues = row.getDimension(dimension); BufferAggregator[] aggs = new BufferAggregator[metrics.length];
for (int i = 0; i < metrics.length; i++) {
// Set column capabilities as data is coming in final AggregatorFactory agg = metrics[i];
ColumnCapabilitiesImpl capabilities = columnCapabilities.get(dimension); aggs[i] = agg.factorizeBuffered(
if (capabilities == null) { makeColumnSelectorFactory(agg, in, deserializeComplexMetrics)
capabilities = new ColumnCapabilitiesImpl(); );
capabilities.setType(ValueType.STRING);
columnCapabilities.put(dimension, capabilities);
} }
if (dimensionValues.size() > 1) { return aggs;
capabilities.setHasMultipleValues(true);
} }
Integer index = dimensionOrder.get(dimension); @Override
if (index == null) { protected Integer addToFacts(
dimensionOrder.put(dimension, dimensionOrder.size()); AggregatorFactory[] metrics,
dimensions.add(dimension); boolean deserializeComplexMetrics,
InputRow row,
if (overflow == null) { AtomicInteger numEntries,
overflow = Lists.newArrayList(); TimeAndDims key,
} ThreadLocal<InputRow> in
overflow.add(getDimVals(dimValues.add(dimension), dimensionValues)); )
} else { {
dims[index] = getDimVals(dimValues.get(dimension), dimensionValues); final BufferAggregator[] aggs = getAggs();
}
}
}
if (overflow != null) {
// Merge overflow and non-overflow
String[][] newDims = new String[dims.length + overflow.size()][];
System.arraycopy(dims, 0, newDims, 0, dims.length);
for (int i = 0; i < overflow.size(); ++i) {
newDims[dims.length + i] = overflow.get(i);
}
dims = newDims;
}
final TimeAndDims key = new TimeAndDims(Math.max(gran.truncate(row.getTimestampFromEpoch()), minTimestamp), dims);
Integer rowOffset; Integer rowOffset;
synchronized (this) { synchronized (this) {
rowOffset = totalAggSize * numEntries.get(); rowOffset = totalAggSize * numEntries.get();
@ -467,195 +179,33 @@ public class OffheapIncrementalIndex implements IncrementalIndex
} }
@Override @Override
public boolean isEmpty() protected BufferAggregator[] getAggsForRow(int rowOffset)
{ {
return numEntries.get() == 0; return getAggs();
} }
@Override @Override
public int size() protected Object getAggVal(BufferAggregator agg, int rowOffset, int aggPosition)
{ {
return numEntries.get(); return agg.get(bufferHolder.get(), getMetricPosition(rowOffset, aggPosition));
}
private long getMinTimeMillis()
{
return facts.firstKey().getTimestamp();
}
private long getMaxTimeMillis()
{
return facts.lastKey().getTimestamp();
}
private String[] getDimVals(final DimDim dimLookup, final List<String> dimValues)
{
final String[] retVal = new String[dimValues.size()];
int count = 0;
for (String dimValue : dimValues) {
String canonicalDimValue = dimLookup.get(dimValue);
if (!dimLookup.contains(canonicalDimValue)) {
dimLookup.add(dimValue);
}
retVal[count] = canonicalDimValue;
count++;
}
Arrays.sort(retVal);
return retVal;
}
@Override
public AggregatorFactory[] getMetricAggs()
{
return metrics;
}
@Override
public List<String> getDimensions()
{
return dimensions;
}
@Override
public String getMetricType(String metric)
{
return metricTypes.get(metric);
}
@Override
public Interval getInterval()
{
return new Interval(minTimestamp, isEmpty() ? minTimestamp : gran.next(getMaxTimeMillis()));
}
@Override
public DateTime getMinTime()
{
return isEmpty() ? null : new DateTime(getMinTimeMillis());
}
@Override
public DateTime getMaxTime()
{
return isEmpty() ? null : new DateTime(getMaxTimeMillis());
}
@Override
public DimDim getDimension(String dimension)
{
return isEmpty() ? null : dimValues.get(dimension);
}
@Override
public Integer getDimensionIndex(String dimension)
{
return dimensionOrder.get(dimension);
}
@Override
public List<String> getMetricNames()
{
return metricNames;
}
@Override
public Integer getMetricIndex(String metricName)
{
return metricIndexes.get(metricName);
}
private int getMetricPosition(int rowOffset, int metricIndex)
{
return rowOffset + aggPositionOffsets[metricIndex];
} }
@Override @Override
public float getMetricFloatValue(int rowOffset, int aggOffset) public float getMetricFloatValue(int rowOffset, int aggOffset)
{ {
return aggs[aggOffset].getFloat(bufferHolder.get(), getMetricPosition(rowOffset, aggOffset)); return getAggs()[aggOffset].getFloat(bufferHolder.get(), getMetricPosition(rowOffset, aggOffset));
} }
@Override @Override
public long getMetricLongValue(int rowOffset, int aggOffset) public long getMetricLongValue(int rowOffset, int aggOffset)
{ {
return aggs[aggOffset].getLong(bufferHolder.get(), getMetricPosition(rowOffset, aggOffset)); return getAggs()[aggOffset].getLong(bufferHolder.get(), getMetricPosition(rowOffset, aggOffset));
} }
@Override @Override
public Object getMetricObjectValue(int rowOffset, int aggOffset) public Object getMetricObjectValue(int rowOffset, int aggOffset)
{ {
return aggs[aggOffset].get(bufferHolder.get(), getMetricPosition(rowOffset, aggOffset)); return getAggs()[aggOffset].get(bufferHolder.get(), getMetricPosition(rowOffset, aggOffset));
}
@Override
public ColumnCapabilities getCapabilities(String column)
{
return columnCapabilities.get(column);
}
@Override
public ConcurrentNavigableMap<TimeAndDims, Integer> getFacts()
{
return facts;
}
@Override
public ConcurrentNavigableMap<TimeAndDims, Integer> getSubMap(TimeAndDims start, TimeAndDims end)
{
return facts.subMap(start, end);
}
@Override
public Iterator<Row> iterator()
{
return iterableWithPostAggregations(null).iterator();
}
public Iterable<Row> iterableWithPostAggregations(final List<PostAggregator> postAggs)
{
return new Iterable<Row>()
{
@Override
public Iterator<Row> iterator()
{
return Iterators.transform(
facts.entrySet().iterator(),
new Function<Map.Entry<TimeAndDims, Integer>, Row>()
{
@Override
public Row apply(final Map.Entry<TimeAndDims, Integer> input)
{
final TimeAndDims timeAndDims = input.getKey();
final int rowOffset = input.getValue();
String[][] theDims = timeAndDims.getDims();
Map<String, Object> theVals = Maps.newLinkedHashMap();
for (int i = 0; i < theDims.length; ++i) {
String[] dim = theDims[i];
if (dim != null && dim.length != 0) {
theVals.put(dimensions.get(i), dim.length == 1 ? dim[0] : Arrays.asList(dim));
}
}
for (int i = 0; i < aggs.length; ++i) {
theVals.put(metrics[i].getName(), aggs[i].get(bufferHolder.get(), getMetricPosition(rowOffset, i)));
}
if (postAggs != null) {
for (PostAggregator postAgg : postAggs) {
theVals.put(postAgg.getName(), postAgg.compute(theVals));
}
}
return new MapBasedRow(timeAndDims.getTimestamp(), theVals);
}
}
);
}
};
} }
@Override @Override
@ -669,34 +219,18 @@ public class OffheapIncrementalIndex implements IncrementalIndex
} }
} }
class DimensionHolder private int getMetricPosition(int rowOffset, int metricIndex)
{ {
private final Map<String, DimDim> dimensions; return rowOffset + aggPositionOffsets[metricIndex];
DimensionHolder()
{
dimensions = Maps.newConcurrentMap();
} }
DimDim add(String dimension) private DimDim getDimDim(int dimIndex)
{ {
DimDim holder = dimensions.get(dimension); return getDimValues().get(getDimensions().get(dimIndex));
if (holder == null) {
holder = new OffheapDimDim(dimension);
dimensions.put(dimension, holder);
} else {
throw new ISE("dimension[%s] already existed even though add() was called!?", dimension);
}
return holder;
} }
DimDim get(String dimension) // MapDB forces serializers to implement serializable, which sucks
{ private static class TimeAndDimsSerializer extends BTreeKeySerializer<TimeAndDims> implements Serializable
return dimensions.get(dimension);
}
}
public static class TimeAndDimsSerializer extends BTreeKeySerializer<TimeAndDims> implements Serializable
{ {
private final TimeAndDimsComparator comparator; private final TimeAndDimsComparator comparator;
private final transient OffheapIncrementalIndex incrementalIndex; private final transient OffheapIncrementalIndex incrementalIndex;
@ -719,7 +253,7 @@ public class OffheapIncrementalIndex implements IncrementalIndex
if (dims == null) { if (dims == null) {
out.write(-1); out.write(-1);
} else { } else {
DimDim dimDim = incrementalIndex.getDimension(incrementalIndex.dimensions.get(index)); DimDim dimDim = incrementalIndex.getDimDim(index);
out.writeInt(dims.length); out.writeInt(dims.length);
for (String value : dims) { for (String value : dims) {
out.writeInt(dimDim.getId(value)); out.writeInt(dimDim.getId(value));
@ -740,7 +274,7 @@ public class OffheapIncrementalIndex implements IncrementalIndex
for (int k = 0; k < dims.length; k++) { for (int k = 0; k < dims.length; k++) {
int len = in.readInt(); int len = in.readInt();
if (len != -1) { if (len != -1) {
DimDim dimDim = incrementalIndex.getDimension(incrementalIndex.dimensions.get(k)); DimDim dimDim = incrementalIndex.getDimDim(k);
String[] col = new String[len]; String[] col = new String[len];
for (int l = 0; l < col.length; l++) { for (int l = 0; l < col.length; l++) {
col[l] = dimDim.get(dimDim.getValue(in.readInt())); col[l] = dimDim.get(dimDim.getValue(in.readInt()));
@ -760,7 +294,7 @@ public class OffheapIncrementalIndex implements IncrementalIndex
} }
} }
public static class TimeAndDimsComparator implements Comparator, Serializable private static class TimeAndDimsComparator implements Comparator, Serializable
{ {
@Override @Override
public int compare(Object o1, Object o2) public int compare(Object o1, Object o2)
@ -773,8 +307,8 @@ public class OffheapIncrementalIndex implements IncrementalIndex
{ {
private final Map<String, Integer> falseIds; private final Map<String, Integer> falseIds;
private final Map<Integer, String> falseIdsReverse; private final Map<Integer, String> falseIdsReverse;
private final WeakHashMap<String, WeakReference<String>> cache = private final WeakHashMap<String, WeakReference<String>> cache = new WeakHashMap();
new WeakHashMap();
private volatile String[] sortedVals = null; private volatile String[] sortedVals = null;
// size on MapDB is slow so maintain a count here // size on MapDB is slow so maintain a count here
private volatile int size = 0; private volatile int size = 0;
@ -874,5 +408,4 @@ public class OffheapIncrementalIndex implements IncrementalIndex
return s1.equals(s2); return s1.equals(s2);
} }
} }
} }

View File

@ -19,144 +19,51 @@
package io.druid.segment.incremental; package io.druid.segment.incremental;
import com.google.common.base.Function;
import com.google.common.collect.BiMap; import com.google.common.collect.BiMap;
import com.google.common.collect.HashBiMap; import com.google.common.collect.HashBiMap;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import com.metamx.common.IAE;
import com.metamx.common.ISE; import com.metamx.common.ISE;
import io.druid.data.input.InputRow; import io.druid.data.input.InputRow;
import io.druid.data.input.MapBasedRow;
import io.druid.data.input.Row;
import io.druid.data.input.impl.SpatialDimensionSchema;
import io.druid.granularity.QueryGranularity; import io.druid.granularity.QueryGranularity;
import io.druid.query.aggregation.Aggregator; import io.druid.query.aggregation.Aggregator;
import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.PostAggregator;
import io.druid.segment.ColumnSelectorFactory;
import io.druid.segment.DimensionSelector;
import io.druid.segment.FloatColumnSelector;
import io.druid.segment.LongColumnSelector;
import io.druid.segment.ObjectColumnSelector;
import io.druid.segment.column.Column;
import io.druid.segment.column.ColumnCapabilities;
import io.druid.segment.column.ColumnCapabilitiesImpl;
import io.druid.segment.column.ValueType;
import io.druid.segment.data.IndexedInts;
import io.druid.segment.serde.ComplexMetricExtractor;
import io.druid.segment.serde.ComplexMetricSerde;
import io.druid.segment.serde.ComplexMetrics;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentNavigableMap; import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
/** /**
*/ */
public class OnheapIncrementalIndex implements IncrementalIndex public class OnheapIncrementalIndex extends IncrementalIndex<Aggregator>
{ {
private final long minTimestamp;
private final QueryGranularity gran;
private final List<Function<InputRow, InputRow>> rowTransformers;
private final AggregatorFactory[] metrics;
private final Map<String, Integer> metricIndexes;
private final Map<String, String> metricTypes;
private final ImmutableList<String> metricNames;
private final LinkedHashMap<String, Integer> dimensionOrder;
protected final CopyOnWriteArrayList<String> dimensions;
private final DimensionHolder dimValues;
private final Map<String, ColumnCapabilitiesImpl> columnCapabilities;
private final ConcurrentNavigableMap<TimeAndDims, Integer> facts; private final ConcurrentNavigableMap<TimeAndDims, Integer> facts;
private final List<Aggregator[]> aggList; private final List<Aggregator[]> aggList = Lists.newArrayList();
private volatile AtomicInteger numEntries = new AtomicInteger();
// This is modified on add() in a critical section. public OnheapIncrementalIndex(IncrementalIndexSchema incrementalIndexSchema, boolean deserializeComplexMetrics)
private ThreadLocal<InputRow> in = new ThreadLocal<>(); {
private final boolean deserializeComplexMetrics; super(incrementalIndexSchema, deserializeComplexMetrics);
this.facts = new ConcurrentSkipListMap<>();
}
/**
* Setting deserializeComplexMetrics to false is necessary for intermediate aggregation such as groupBy that
* should not deserialize input columns using ComplexMetricSerde for aggregators that return complex metrics.
*
* @param incrementalIndexSchema
* @param deserializeComplexMetrics flag whether or not to call ComplexMetricExtractor.extractValue() on the input
* value for aggregators that return metrics other than float.
*/
public OnheapIncrementalIndex( public OnheapIncrementalIndex(
IncrementalIndexSchema incrementalIndexSchema, long minTimestamp,
final boolean deserializeComplexMetrics QueryGranularity gran,
final AggregatorFactory[] metrics,
boolean deserializeComplexMetrics
) )
{ {
this.minTimestamp = incrementalIndexSchema.getMinTimestamp(); this(
this.gran = incrementalIndexSchema.getGran(); new IncrementalIndexSchema.Builder().withMinTimestamp(minTimestamp)
this.metrics = incrementalIndexSchema.getMetrics(); .withQueryGranularity(gran)
this.rowTransformers = Lists.newCopyOnWriteArrayList(); .withMetrics(metrics)
.build(),
final ImmutableList.Builder<String> metricNamesBuilder = ImmutableList.builder(); deserializeComplexMetrics
final ImmutableMap.Builder<String, Integer> metricIndexesBuilder = ImmutableMap.builder(); );
final ImmutableMap.Builder<String, String> metricTypesBuilder = ImmutableMap.builder();
this.aggList = Lists.newArrayList();
for (int i = 0; i < metrics.length; i++) {
final String metricName = metrics[i].getName();
metricNamesBuilder.add(metricName);
metricIndexesBuilder.put(metricName, i);
metricTypesBuilder.put(metricName, metrics[i].getTypeName());
}
metricNames = metricNamesBuilder.build();
metricIndexes = metricIndexesBuilder.build();
metricTypes = metricTypesBuilder.build();
this.dimensionOrder = Maps.newLinkedHashMap();
this.dimensions = new CopyOnWriteArrayList<>();
// This should really be more generic
List<SpatialDimensionSchema> spatialDimensions = incrementalIndexSchema.getDimensionsSpec().getSpatialDimensions();
if (!spatialDimensions.isEmpty()) {
this.rowTransformers.add(new SpatialDimensionRowTransformer(spatialDimensions));
}
this.columnCapabilities = Maps.newHashMap();
for (Map.Entry<String, String> entry : metricTypes.entrySet()) {
ValueType type;
if (entry.getValue().equalsIgnoreCase("float")) {
type = ValueType.FLOAT;
} else if (entry.getValue().equalsIgnoreCase("long")) {
type = ValueType.LONG;
} else {
type = ValueType.COMPLEX;
}
ColumnCapabilitiesImpl capabilities = new ColumnCapabilitiesImpl();
capabilities.setType(type);
columnCapabilities.put(entry.getKey(), capabilities);
}
for (String dimension : dimensions) {
ColumnCapabilitiesImpl capabilities = new ColumnCapabilitiesImpl();
capabilities.setType(ValueType.STRING);
columnCapabilities.put(dimension, capabilities);
}
for (SpatialDimensionSchema spatialDimension : spatialDimensions) {
ColumnCapabilitiesImpl capabilities = new ColumnCapabilitiesImpl();
capabilities.setType(ValueType.STRING);
capabilities.setHasSpatialIndexes(true);
columnCapabilities.put(spatialDimension.getDimName(), capabilities);
}
this.dimValues = new DimensionHolder();
this.facts = new ConcurrentSkipListMap<>();
this.deserializeComplexMetrics = deserializeComplexMetrics;
} }
public OnheapIncrementalIndex( public OnheapIncrementalIndex(
@ -181,102 +88,36 @@ public class OnheapIncrementalIndex implements IncrementalIndex
this(incrementalIndexSchema, true); this(incrementalIndexSchema, true);
} }
public OnheapIncrementalIndex( @Override
long minTimestamp, public ConcurrentNavigableMap<TimeAndDims, Integer> getFacts()
QueryGranularity gran, {
final AggregatorFactory[] metrics, return facts;
boolean deserializeComplexMetrics }
@Override
protected DimDim makeDimDim(String dimension)
{
return new OnHeapDimDim();
}
@Override
protected Aggregator[] initAggs(
AggregatorFactory[] metrics, ThreadLocal<InputRow> in, boolean deserializeComplexMetrics
) )
{ {
this( return new Aggregator[metrics.length];
new IncrementalIndexSchema.Builder().withMinTimestamp(minTimestamp)
.withQueryGranularity(gran)
.withMetrics(metrics)
.build(),
deserializeComplexMetrics
);
} }
@Override @Override
public InputRow formatRow(InputRow row) protected Integer addToFacts(
AggregatorFactory[] metrics,
boolean deserializeComplexMetrics,
InputRow row,
AtomicInteger numEntries,
TimeAndDims key,
ThreadLocal<InputRow> in
)
{ {
for (Function<InputRow, InputRow> rowTransformer : rowTransformers) {
row = rowTransformer.apply(row);
}
if (row == null) {
throw new IAE("Row is null? How can this be?!");
}
return row;
}
/**
* Adds a new row. The row might correspond with another row that already exists, in which case this will
* update that row instead of inserting a new one.
* <p/>
* <p/>
* Calls to add() are thread safe.
* <p/>
*
* @param row the row of data to add
*
* @return the number of rows in the data set after adding the InputRow
*/
@Override
public int add(InputRow row)
{
row = formatRow(row);
if (row.getTimestampFromEpoch() < minTimestamp) {
throw new IAE("Cannot add row[%s] because it is below the minTimestamp[%s]", row, new DateTime(minTimestamp));
}
final List<String> rowDimensions = row.getDimensions();
String[][] dims;
List<String[]> overflow = null;
synchronized (dimensionOrder) {
dims = new String[dimensionOrder.size()][];
for (String dimension : rowDimensions) {
List<String> dimensionValues = row.getDimension(dimension);
// Set column capabilities as data is coming in
ColumnCapabilitiesImpl capabilities = columnCapabilities.get(dimension);
if (capabilities == null) {
capabilities = new ColumnCapabilitiesImpl();
capabilities.setType(ValueType.STRING);
columnCapabilities.put(dimension, capabilities);
}
if (dimensionValues.size() > 1) {
capabilities.setHasMultipleValues(true);
}
Integer index = dimensionOrder.get(dimension);
if (index == null) {
dimensionOrder.put(dimension, dimensionOrder.size());
dimensions.add(dimension);
if (overflow == null) {
overflow = Lists.newArrayList();
}
overflow.add(getDimVals(dimValues.add(dimension), dimensionValues));
} else {
dims[index] = getDimVals(dimValues.get(dimension), dimensionValues);
}
}
}
if (overflow != null) {
// Merge overflow and non-overflow
String[][] newDims = new String[dims.length + overflow.size()][];
System.arraycopy(dims, 0, newDims, 0, dims.length);
for (int i = 0; i < overflow.size(); ++i) {
newDims[dims.length + i] = overflow.get(i);
}
dims = newDims;
}
final TimeAndDims key = new TimeAndDims(Math.max(gran.truncate(row.getTimestampFromEpoch()), minTimestamp), dims);
Integer rowOffset; Integer rowOffset;
synchronized (this) { synchronized (this) {
rowOffset = numEntries.get(); rowOffset = numEntries.get();
@ -288,160 +129,17 @@ public class OnheapIncrementalIndex implements IncrementalIndex
for (int i = 0; i < metrics.length; i++) { for (int i = 0; i < metrics.length; i++) {
final AggregatorFactory agg = metrics[i]; final AggregatorFactory agg = metrics[i];
aggs[i] = agg.factorize( aggs[i] = agg.factorize(
new ColumnSelectorFactory() makeColumnSelectorFactory(agg, in, deserializeComplexMetrics)
{
@Override
public LongColumnSelector makeLongColumnSelector(final String columnName)
{
if (columnName.equals(Column.TIME_COLUMN_NAME)) {
return new LongColumnSelector()
{
@Override
public long get()
{
return in.get().getTimestampFromEpoch();
}
};
}
return new LongColumnSelector()
{
@Override
public long get()
{
return in.get().getLongMetric(columnName);
}
};
}
@Override
public FloatColumnSelector makeFloatColumnSelector(final String columnName)
{
return new FloatColumnSelector()
{
@Override
public float get()
{
return in.get().getFloatMetric(columnName);
}
};
}
@Override
public ObjectColumnSelector makeObjectColumnSelector(final String column)
{
final String typeName = agg.getTypeName();
final ObjectColumnSelector<Object> rawColumnSelector = new ObjectColumnSelector<Object>()
{
@Override
public Class classOfObject()
{
return Object.class;
}
@Override
public Object get()
{
return in.get().getRaw(column);
}
};
if (!deserializeComplexMetrics) {
return rawColumnSelector;
} else {
if (typeName.equals("float")) {
return rawColumnSelector;
}
final ComplexMetricSerde serde = ComplexMetrics.getSerdeForType(typeName);
if (serde == null) {
throw new ISE("Don't know how to handle type[%s]", typeName);
}
final ComplexMetricExtractor extractor = serde.getExtractor();
return new ObjectColumnSelector()
{
@Override
public Class classOfObject()
{
return extractor.extractedClass();
}
@Override
public Object get()
{
return extractor.extractValue(in.get(), column);
}
};
}
}
@Override
public DimensionSelector makeDimensionSelector(final String dimension)
{
return new DimensionSelector()
{
@Override
public IndexedInts getRow()
{
final List<String> dimensionValues = in.get().getDimension(dimension);
final ArrayList<Integer> vals = Lists.newArrayList();
if (dimensionValues != null) {
for (int i = 0; i < dimensionValues.size(); ++i) {
vals.add(i);
}
}
return new IndexedInts()
{
@Override
public int size()
{
return vals.size();
}
@Override
public int get(int index)
{
return vals.get(index);
}
@Override
public Iterator<Integer> iterator()
{
return vals.iterator();
}
};
}
@Override
public int getValueCardinality()
{
throw new UnsupportedOperationException("value cardinality is unknown in incremental index");
}
@Override
public String lookupName(int id)
{
return in.get().getDimension(dimension).get(id);
}
@Override
public int lookupId(String name)
{
return in.get().getDimension(dimension).indexOf(name);
}
};
}
}
); );
} }
aggList.add(aggs); aggList.add(aggs);
numEntries.incrementAndGet(); numEntries.incrementAndGet();
} }
} }
in.set(row); in.set(row);
Aggregator[] aggs = aggList.get(rowOffset);
final Aggregator[] aggs = aggList.get(rowOffset);
for (int i = 0; i < aggs.length; i++) { for (int i = 0; i < aggs.length; i++) {
synchronized (aggs[i]) { synchronized (aggs[i]) {
aggs[i].aggregate(); aggs[i].aggregate();
@ -452,15 +150,15 @@ public class OnheapIncrementalIndex implements IncrementalIndex
} }
@Override @Override
public boolean isEmpty() protected Aggregator[] getAggsForRow(int rowOffset)
{ {
return numEntries.get() == 0; return aggList.get(rowOffset);
} }
@Override @Override
public int size() protected Object getAggVal(Aggregator agg, int rowOffset, int aggPosition)
{ {
return numEntries.get(); return agg.get();
} }
@Override @Override
@ -481,196 +179,14 @@ public class OnheapIncrementalIndex implements IncrementalIndex
return aggList.get(rowOffset)[aggOffset].get(); return aggList.get(rowOffset)[aggOffset].get();
} }
private long getMinTimeMillis() private static class OnHeapDimDim implements DimDim
{
return facts.firstKey().getTimestamp();
}
private long getMaxTimeMillis()
{
return facts.lastKey().getTimestamp();
}
private String[] getDimVals(final DimDim dimLookup, final List<String> dimValues)
{
final String[] retVal = new String[dimValues.size()];
int count = 0;
for (String dimValue : dimValues) {
String canonicalDimValue = dimLookup.get(dimValue);
if (!dimLookup.contains(canonicalDimValue)) {
dimLookup.add(dimValue);
}
retVal[count] = canonicalDimValue;
count++;
}
Arrays.sort(retVal);
return retVal;
}
@Override
public AggregatorFactory[] getMetricAggs()
{
return metrics;
}
@Override
public List<String> getDimensions()
{
return dimensions;
}
@Override
public String getMetricType(String metric)
{
return metricTypes.get(metric);
}
public Interval getInterval()
{
return new Interval(minTimestamp, isEmpty() ? minTimestamp : gran.next(getMaxTimeMillis()));
}
public DateTime getMinTime()
{
return isEmpty() ? null : new DateTime(getMinTimeMillis());
}
public DateTime getMaxTime()
{
return isEmpty() ? null : new DateTime(getMaxTimeMillis());
}
public DimDim getDimension(String dimension)
{
return isEmpty() ? null : dimValues.get(dimension);
}
public Integer getDimensionIndex(String dimension)
{
return dimensionOrder.get(dimension);
}
public List<String> getMetricNames()
{
return metricNames;
}
public Integer getMetricIndex(String metricName)
{
return metricIndexes.get(metricName);
}
public ColumnCapabilities getCapabilities(String column)
{
return columnCapabilities.get(column);
}
public ConcurrentNavigableMap<TimeAndDims, Integer> getFacts()
{
return facts;
}
public ConcurrentNavigableMap<TimeAndDims, Integer> getSubMap(TimeAndDims start, TimeAndDims end)
{
return facts.subMap(start, end);
}
@Override
public Iterator<Row> iterator()
{
return iterableWithPostAggregations(null).iterator();
}
@Override
public Iterable<Row> iterableWithPostAggregations(final List<PostAggregator> postAggs)
{
return new Iterable<Row>()
{
@Override
public Iterator<Row> iterator()
{
return Iterators.transform(
facts.entrySet().iterator(),
new Function<Map.Entry<TimeAndDims, Integer>, Row>()
{
@Override
public Row apply(final Map.Entry<TimeAndDims, Integer> input)
{
final TimeAndDims timeAndDims = input.getKey();
final int rowOffset = input.getValue();
String[][] theDims = timeAndDims.getDims();
Map<String, Object> theVals = Maps.newLinkedHashMap();
for (int i = 0; i < theDims.length; ++i) {
String[] dim = theDims[i];
if (dim != null && dim.length != 0) {
theVals.put(dimensions.get(i), dim.length == 1 ? dim[0] : Arrays.asList(dim));
}
}
Aggregator[] aggs = aggList.get(rowOffset);
for (int i = 0; i < aggs.length; ++i) {
theVals.put(metrics[i].getName(), aggs[i].get());
}
if (postAggs != null) {
for (PostAggregator postAgg : postAggs) {
theVals.put(postAgg.getName(), postAgg.compute(theVals));
}
}
return new MapBasedRow(timeAndDims.getTimestamp(), theVals);
}
}
);
}
};
}
@Override
public void close()
{
// Nothing to close
}
class DimensionHolder
{
private final Map<String, DimDim> dimensions;
DimensionHolder()
{
dimensions = Maps.newConcurrentMap();
}
DimDim add(String dimension)
{
DimDim holder = dimensions.get(dimension);
if (holder == null) {
holder = new DimDimImpl();
dimensions.put(dimension, holder);
} else {
throw new ISE("dimension[%s] already existed even though add() was called!?", dimension);
}
return holder;
}
DimDim get(String dimension)
{
return dimensions.get(dimension);
}
}
private static class DimDimImpl implements DimDim
{ {
private final Map<String, Integer> falseIds; private final Map<String, Integer> falseIds;
private final Map<Integer, String> falseIdsReverse; private final Map<Integer, String> falseIdsReverse;
private volatile String[] sortedVals = null; private volatile String[] sortedVals = null;
final ConcurrentMap<String, String> poorMansInterning = Maps.newConcurrentMap(); final ConcurrentMap<String, String> poorMansInterning = Maps.newConcurrentMap();
public OnHeapDimDim()
public DimDimImpl()
{ {
BiMap<String, Integer> biMap = Maps.synchronizedBiMap(HashBiMap.<String, Integer>create()); BiMap<String, Integer> biMap = Maps.synchronizedBiMap(HashBiMap.<String, Integer>create());
falseIds = biMap; falseIds = biMap;

View File

@ -27,6 +27,7 @@ import io.druid.data.input.MapBasedInputRow;
import io.druid.granularity.QueryGranularity; import io.druid.granularity.QueryGranularity;
import io.druid.query.TestQueryRunners; import io.druid.query.TestQueryRunners;
import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory;
import io.druid.segment.column.Column; import io.druid.segment.column.Column;
import io.druid.segment.data.IncrementalIndexTest; import io.druid.segment.data.IncrementalIndexTest;
import io.druid.segment.incremental.IncrementalIndex; import io.druid.segment.incremental.IncrementalIndex;
@ -60,7 +61,7 @@ public class IndexMergerTest
Assert.assertEquals(2, index.getColumn(Column.TIME_COLUMN_NAME).getLength()); Assert.assertEquals(2, index.getColumn(Column.TIME_COLUMN_NAME).getLength());
Assert.assertEquals(Arrays.asList("dim1", "dim2"), Lists.newArrayList(index.getAvailableDimensions())); Assert.assertEquals(Arrays.asList("dim1", "dim2"), Lists.newArrayList(index.getAvailableDimensions()));
Assert.assertEquals(2, index.getColumnNames().size()); Assert.assertEquals(3, index.getColumnNames().size());
} }
finally { finally {
tempDir.delete(); tempDir.delete();
@ -74,7 +75,7 @@ public class IndexMergerTest
IncrementalIndex toPersist1 = IncrementalIndexTest.createCaseInsensitiveIndex(true); IncrementalIndex toPersist1 = IncrementalIndexTest.createCaseInsensitiveIndex(true);
IncrementalIndexTest.populateIndex(timestamp, toPersist1); IncrementalIndexTest.populateIndex(timestamp, toPersist1);
IncrementalIndex toPersist2 = new OnheapIncrementalIndex(0L, QueryGranularity.NONE, new AggregatorFactory[]{}); IncrementalIndex toPersist2 = new OnheapIncrementalIndex(0L, QueryGranularity.NONE, new AggregatorFactory[]{new CountAggregatorFactory("count")});
toPersist2.add( toPersist2.add(
new MapBasedInputRow( new MapBasedInputRow(
@ -100,25 +101,25 @@ public class IndexMergerTest
Assert.assertEquals(2, index1.getColumn(Column.TIME_COLUMN_NAME).getLength()); Assert.assertEquals(2, index1.getColumn(Column.TIME_COLUMN_NAME).getLength());
Assert.assertEquals(Arrays.asList("dim1", "dim2"), Lists.newArrayList(index1.getAvailableDimensions())); Assert.assertEquals(Arrays.asList("dim1", "dim2"), Lists.newArrayList(index1.getAvailableDimensions()));
Assert.assertEquals(2, index1.getColumnNames().size()); Assert.assertEquals(3, index1.getColumnNames().size());
QueryableIndex index2 = IndexIO.loadIndex(IndexMerger.persist(toPersist2, tempDir2)); QueryableIndex index2 = IndexIO.loadIndex(IndexMerger.persist(toPersist2, tempDir2));
Assert.assertEquals(2, index2.getColumn(Column.TIME_COLUMN_NAME).getLength()); Assert.assertEquals(2, index2.getColumn(Column.TIME_COLUMN_NAME).getLength());
Assert.assertEquals(Arrays.asList("dim1", "dim2"), Lists.newArrayList(index2.getAvailableDimensions())); Assert.assertEquals(Arrays.asList("dim1", "dim2"), Lists.newArrayList(index2.getAvailableDimensions()));
Assert.assertEquals(2, index2.getColumnNames().size()); Assert.assertEquals(3, index2.getColumnNames().size());
QueryableIndex merged = IndexIO.loadIndex( QueryableIndex merged = IndexIO.loadIndex(
IndexMerger.mergeQueryableIndex( IndexMerger.mergeQueryableIndex(
Arrays.asList(index1, index2), Arrays.asList(index1, index2),
new AggregatorFactory[]{}, new AggregatorFactory[]{new CountAggregatorFactory("count")},
mergedDir mergedDir
) )
); );
Assert.assertEquals(3, merged.getColumn(Column.TIME_COLUMN_NAME).getLength()); Assert.assertEquals(3, merged.getColumn(Column.TIME_COLUMN_NAME).getLength());
Assert.assertEquals(Arrays.asList("dim1", "dim2"), Lists.newArrayList(merged.getAvailableDimensions())); Assert.assertEquals(Arrays.asList("dim1", "dim2"), Lists.newArrayList(merged.getAvailableDimensions()));
Assert.assertEquals(2, merged.getColumnNames().size()); Assert.assertEquals(3, merged.getColumnNames().size());
} }
finally { finally {
FileUtils.deleteQuietly(tempDir1); FileUtils.deleteQuietly(tempDir1);

View File

@ -25,6 +25,7 @@ import io.druid.data.input.Row;
import io.druid.granularity.QueryGranularity; import io.druid.granularity.QueryGranularity;
import io.druid.query.TestQueryRunners; import io.druid.query.TestQueryRunners;
import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory;
import io.druid.segment.incremental.IncrementalIndex; import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.OffheapIncrementalIndex; import io.druid.segment.incremental.OffheapIncrementalIndex;
import io.druid.segment.incremental.OnheapIncrementalIndex; import io.druid.segment.incremental.OnheapIncrementalIndex;
@ -99,13 +100,13 @@ public class IncrementalIndexTest
return new OffheapIncrementalIndex( return new OffheapIncrementalIndex(
0L, 0L,
QueryGranularity.NONE, QueryGranularity.NONE,
new AggregatorFactory[]{}, new AggregatorFactory[]{new CountAggregatorFactory("count")},
TestQueryRunners.pool, TestQueryRunners.pool,
true true
); );
} else { } else {
return new OnheapIncrementalIndex( return new OnheapIncrementalIndex(
0L, QueryGranularity.NONE, new AggregatorFactory[]{} 0L, QueryGranularity.NONE, new AggregatorFactory[]{new CountAggregatorFactory("count")}
); );
} }
} }