diff --git a/processing/src/main/java/io/druid/query/groupby/GroupByQueryHelper.java b/processing/src/main/java/io/druid/query/groupby/GroupByQueryHelper.java index 6ff2b70fa87..f10f1079851 100644 --- a/processing/src/main/java/io/druid/query/groupby/GroupByQueryHelper.java +++ b/processing/src/main/java/io/druid/query/groupby/GroupByQueryHelper.java @@ -76,7 +76,7 @@ public class GroupByQueryHelper } ); final IncrementalIndex index; - if(query.getContextValue("useOffheap", false)){ + if (query.getContextValue("useOffheap", false)) { index = new OffheapIncrementalIndex( // use granularity truncated min timestamp // since incoming truncated timestamps may precede timeStart @@ -87,14 +87,14 @@ public class GroupByQueryHelper false ); } else { - index = new OnheapIncrementalIndex( - // use granularity truncated min timestamp - // since incoming truncated timestamps may precede timeStart - granTimeStart, - gran, - aggs.toArray(new AggregatorFactory[aggs.size()]), - false - ); + index = new OnheapIncrementalIndex( + // use granularity truncated min timestamp + // since incoming truncated timestamps may precede timeStart + granTimeStart, + gran, + aggs.toArray(new AggregatorFactory[aggs.size()]), + false + ); } Accumulator accumulator = new Accumulator() diff --git a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java index 3c2d8844f82..1f45ae61577 100644 --- a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java +++ b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java @@ -20,71 +20,599 @@ package io.druid.segment.incremental; import com.google.common.base.Function; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterators; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import com.google.common.primitives.Ints; import com.google.common.primitives.Longs; +import com.metamx.common.IAE; +import com.metamx.common.ISE; import io.druid.data.input.InputRow; +import io.druid.data.input.MapBasedRow; import io.druid.data.input.Row; +import io.druid.data.input.impl.SpatialDimensionSchema; +import io.druid.granularity.QueryGranularity; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.PostAggregator; +import io.druid.segment.ColumnSelectorFactory; +import io.druid.segment.DimensionSelector; +import io.druid.segment.FloatColumnSelector; +import io.druid.segment.LongColumnSelector; +import io.druid.segment.ObjectColumnSelector; +import io.druid.segment.column.Column; import io.druid.segment.column.ColumnCapabilities; +import io.druid.segment.column.ColumnCapabilitiesImpl; +import io.druid.segment.column.ValueType; +import io.druid.segment.data.IndexedInts; +import io.druid.segment.serde.ComplexMetricExtractor; +import io.druid.segment.serde.ComplexMetricSerde; +import io.druid.segment.serde.ComplexMetrics; import org.joda.time.DateTime; import org.joda.time.Interval; import javax.annotation.Nullable; import java.io.Closeable; +import java.util.ArrayList; import java.util.Arrays; +import java.util.Iterator; +import java.util.LinkedHashMap; import java.util.List; +import java.util.Map; import java.util.concurrent.ConcurrentNavigableMap; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.atomic.AtomicInteger; /** */ -public interface IncrementalIndex extends Iterable, Closeable +public abstract class IncrementalIndex implements Iterable, Closeable { + protected static ColumnSelectorFactory makeColumnSelectorFactory( + final AggregatorFactory agg, + final ThreadLocal in, + final boolean deserializeComplexMetrics + ) + { + return new ColumnSelectorFactory() + { + @Override + public LongColumnSelector makeLongColumnSelector(final String columnName) + { + if (columnName.equals(Column.TIME_COLUMN_NAME)) { + return new LongColumnSelector() + { + @Override + public long get() + { + return in.get().getTimestampFromEpoch(); + } + }; + } + return new LongColumnSelector() + { + @Override + public long get() + { + return in.get().getLongMetric(columnName); + } + }; + } - List getDimensions(); + @Override + public FloatColumnSelector makeFloatColumnSelector(final String columnName) + { + return new FloatColumnSelector() + { + @Override + public float get() + { + return in.get().getFloatMetric(columnName); + } + }; + } - ConcurrentNavigableMap getFacts(); + @Override + public ObjectColumnSelector makeObjectColumnSelector(final String column) + { + final String typeName = agg.getTypeName(); - Integer getDimensionIndex(String dimension); + final ObjectColumnSelector rawColumnSelector = new ObjectColumnSelector() + { + @Override + public Class classOfObject() + { + return Object.class; + } - List getMetricNames(); + @Override + public Object get() + { + return in.get().getRaw(column); + } + }; - DimDim getDimension(String dimension); + if (!deserializeComplexMetrics) { + return rawColumnSelector; + } else { + if (typeName.equals("float")) { + return rawColumnSelector; + } - AggregatorFactory[] getMetricAggs(); + final ComplexMetricSerde serde = ComplexMetrics.getSerdeForType(typeName); + if (serde == null) { + throw new ISE("Don't know how to handle type[%s]", typeName); + } - Interval getInterval(); + final ComplexMetricExtractor extractor = serde.getExtractor(); + return new ObjectColumnSelector() + { + @Override + public Class classOfObject() + { + return extractor.extractedClass(); + } - DateTime getMinTime(); + @Override + public Object get() + { + return extractor.extractValue(in.get(), column); + } + }; + } + } - DateTime getMaxTime(); + @Override + public DimensionSelector makeDimensionSelector(final String dimension) + { + return new DimensionSelector() + { + @Override + public IndexedInts getRow() + { + final List dimensionValues = in.get().getDimension(dimension); + final ArrayList vals = Lists.newArrayList(); + if (dimensionValues != null) { + for (int i = 0; i < dimensionValues.size(); ++i) { + vals.add(i); + } + } - boolean isEmpty(); + return new IndexedInts() + { + @Override + public int size() + { + return vals.size(); + } - ConcurrentNavigableMap getSubMap(TimeAndDims start, TimeAndDims end); + @Override + public int get(int index) + { + return vals.get(index); + } - Integer getMetricIndex(String columnName); + @Override + public Iterator iterator() + { + return vals.iterator(); + } + }; + } - String getMetricType(String metric); + @Override + public int getValueCardinality() + { + throw new UnsupportedOperationException("value cardinality is unknown in incremental index"); + } - ColumnCapabilities getCapabilities(String column); + @Override + public String lookupName(int id) + { + return in.get().getDimension(dimension).get(id); + } - int size(); + @Override + public int lookupId(String name) + { + return in.get().getDimension(dimension).indexOf(name); + } + }; + } + }; + } - float getMetricFloatValue(int rowOffset, int aggOffset); + private final long minTimestamp; + private final QueryGranularity gran; + private final List> rowTransformers; + private final AggregatorFactory[] metrics; + private final Map metricIndexes; + private final Map metricTypes; + private final ImmutableList metricNames; + private final LinkedHashMap dimensionOrder; + private final AggregatorType[] aggs; + private final DimensionHolder dimValues; + private final Map columnCapabilities; + private final boolean deserializeComplexMetrics; - long getMetricLongValue(int rowOffset, int aggOffset); + protected final CopyOnWriteArrayList dimensions; - Object getMetricObjectValue(int rowOffset, int aggOffset); + private volatile AtomicInteger numEntries = new AtomicInteger(); - Iterable iterableWithPostAggregations(List postAggregatorSpecs); + // This is modified on add() in a critical section. + private ThreadLocal in = new ThreadLocal<>(); - int add(InputRow inputRow); + /** + * Setting deserializeComplexMetrics to false is necessary for intermediate aggregation such as groupBy that + * should not deserialize input columns using ComplexMetricSerde for aggregators that return complex metrics. + * + * @param incrementalIndexSchema the schema to use for incremental index + * @param deserializeComplexMetrics flag whether or not to call ComplexMetricExtractor.extractValue() on the input + * value for aggregators that return metrics other than float. + */ + public IncrementalIndex( + final IncrementalIndexSchema incrementalIndexSchema, + final boolean deserializeComplexMetrics + ) + { + this.minTimestamp = incrementalIndexSchema.getMinTimestamp(); + this.gran = incrementalIndexSchema.getGran(); + this.metrics = incrementalIndexSchema.getMetrics(); + this.rowTransformers = Lists.newCopyOnWriteArrayList(); + this.deserializeComplexMetrics = deserializeComplexMetrics; - void close(); + final ImmutableList.Builder metricNamesBuilder = ImmutableList.builder(); + final ImmutableMap.Builder metricIndexesBuilder = ImmutableMap.builder(); + final ImmutableMap.Builder metricTypesBuilder = ImmutableMap.builder(); + this.aggs = initAggs(metrics, in, deserializeComplexMetrics); - InputRow formatRow(InputRow parse); + for (int i = 0; i < metrics.length; i++) { + final String metricName = metrics[i].getName(); + metricNamesBuilder.add(metricName); + metricIndexesBuilder.put(metricName, i); + metricTypesBuilder.put(metricName, metrics[i].getTypeName()); + } + metricNames = metricNamesBuilder.build(); + metricIndexes = metricIndexesBuilder.build(); + metricTypes = metricTypesBuilder.build(); + + this.dimensionOrder = Maps.newLinkedHashMap(); + this.dimensions = new CopyOnWriteArrayList<>(); + // This should really be more generic + List spatialDimensions = incrementalIndexSchema.getDimensionsSpec().getSpatialDimensions(); + if (!spatialDimensions.isEmpty()) { + this.rowTransformers.add(new SpatialDimensionRowTransformer(spatialDimensions)); + } + + this.columnCapabilities = Maps.newHashMap(); + for (Map.Entry entry : metricTypes.entrySet()) { + ValueType type; + if (entry.getValue().equalsIgnoreCase("float")) { + type = ValueType.FLOAT; + } else if (entry.getValue().equalsIgnoreCase("long")) { + type = ValueType.LONG; + } else { + type = ValueType.COMPLEX; + } + ColumnCapabilitiesImpl capabilities = new ColumnCapabilitiesImpl(); + capabilities.setType(type); + columnCapabilities.put(entry.getKey(), capabilities); + } + for (String dimension : dimensions) { + ColumnCapabilitiesImpl capabilities = new ColumnCapabilitiesImpl(); + capabilities.setType(ValueType.STRING); + columnCapabilities.put(dimension, capabilities); + } + for (SpatialDimensionSchema spatialDimension : spatialDimensions) { + ColumnCapabilitiesImpl capabilities = new ColumnCapabilitiesImpl(); + capabilities.setType(ValueType.STRING); + capabilities.setHasSpatialIndexes(true); + columnCapabilities.put(spatialDimension.getDimName(), capabilities); + } + this.dimValues = new DimensionHolder(); + } + + public abstract ConcurrentNavigableMap getFacts(); + + protected abstract DimDim makeDimDim(String dimension); + + protected abstract AggregatorType[] initAggs( + AggregatorFactory[] metrics, + ThreadLocal in, + boolean deserializeComplexMetrics + ); + + protected abstract Integer addToFacts( + AggregatorFactory[] metrics, + boolean deserializeComplexMetrics, + InputRow row, + AtomicInteger numEntries, + TimeAndDims key, + ThreadLocal in + ); + + protected abstract AggregatorType[] getAggsForRow(int rowOffset); + + protected abstract Object getAggVal(AggregatorType agg, int rowOffset, int aggPosition); + + protected abstract float getMetricFloatValue(int rowOffset, int aggOffset); + + protected abstract long getMetricLongValue(int rowOffset, int aggOffset); + + protected abstract Object getMetricObjectValue(int rowOffset, int aggOffset); + + @Override + public void close() + { + // Nothing to close + } + + public InputRow formatRow(InputRow row) + { + for (Function rowTransformer : rowTransformers) { + row = rowTransformer.apply(row); + } + + if (row == null) { + throw new IAE("Row is null? How can this be?!"); + } + return row; + } + + /** + * Adds a new row. The row might correspond with another row that already exists, in which case this will + * update that row instead of inserting a new one. + *

+ *

+ * Calls to add() are thread safe. + *

+ * + * @param row the row of data to add + * + * @return the number of rows in the data set after adding the InputRow + */ + public int add(InputRow row) + { + row = formatRow(row); + if (row.getTimestampFromEpoch() < minTimestamp) { + throw new IAE("Cannot add row[%s] because it is below the minTimestamp[%s]", row, new DateTime(minTimestamp)); + } + + final List rowDimensions = row.getDimensions(); + + String[][] dims; + List overflow = null; + synchronized (dimensionOrder) { + dims = new String[dimensionOrder.size()][]; + for (String dimension : rowDimensions) { + List dimensionValues = row.getDimension(dimension); + + // Set column capabilities as data is coming in + ColumnCapabilitiesImpl capabilities = columnCapabilities.get(dimension); + if (capabilities == null) { + capabilities = new ColumnCapabilitiesImpl(); + capabilities.setType(ValueType.STRING); + columnCapabilities.put(dimension, capabilities); + } + if (dimensionValues.size() > 1) { + capabilities.setHasMultipleValues(true); + } + + Integer index = dimensionOrder.get(dimension); + if (index == null) { + dimensionOrder.put(dimension, dimensionOrder.size()); + dimensions.add(dimension); + + if (overflow == null) { + overflow = Lists.newArrayList(); + } + overflow.add(getDimVals(dimValues.add(dimension), dimensionValues)); + } else { + dims[index] = getDimVals(dimValues.get(dimension), dimensionValues); + } + } + } + + if (overflow != null) { + // Merge overflow and non-overflow + String[][] newDims = new String[dims.length + overflow.size()][]; + System.arraycopy(dims, 0, newDims, 0, dims.length); + for (int i = 0; i < overflow.size(); ++i) { + newDims[dims.length + i] = overflow.get(i); + } + dims = newDims; + } + + final TimeAndDims key = new TimeAndDims(Math.max(gran.truncate(row.getTimestampFromEpoch()), minTimestamp), dims); + return addToFacts(metrics, deserializeComplexMetrics, row, numEntries, key, in); + } + + public boolean isEmpty() + { + return numEntries.get() == 0; + } + + public int size() + { + return numEntries.get(); + } + + private long getMinTimeMillis() + { + return getFacts().firstKey().getTimestamp(); + } + + private long getMaxTimeMillis() + { + return getFacts().lastKey().getTimestamp(); + } + + private String[] getDimVals(final DimDim dimLookup, final List dimValues) + { + final String[] retVal = new String[dimValues.size()]; + + int count = 0; + for (String dimValue : dimValues) { + String canonicalDimValue = dimLookup.get(dimValue); + if (!dimLookup.contains(canonicalDimValue)) { + dimLookup.add(dimValue); + } + retVal[count] = canonicalDimValue; + count++; + } + Arrays.sort(retVal); + + return retVal; + } + + public AggregatorType[] getAggs() + { + return aggs; + } + + public AggregatorFactory[] getMetricAggs() + { + return metrics; + } + + public DimensionHolder getDimValues() + { + return dimValues; + } + + public List getDimensions() + { + return dimensions; + } + + public String getMetricType(String metric) + { + return metricTypes.get(metric); + } + + public Interval getInterval() + { + return new Interval(minTimestamp, isEmpty() ? minTimestamp : gran.next(getMaxTimeMillis())); + } + + public DateTime getMinTime() + { + return isEmpty() ? null : new DateTime(getMinTimeMillis()); + } + + public DateTime getMaxTime() + { + return isEmpty() ? null : new DateTime(getMaxTimeMillis()); + } + + public DimDim getDimension(String dimension) + { + return isEmpty() ? null : dimValues.get(dimension); + } + + public Integer getDimensionIndex(String dimension) + { + return dimensionOrder.get(dimension); + } + + public List getMetricNames() + { + return metricNames; + } + + public Integer getMetricIndex(String metricName) + { + return metricIndexes.get(metricName); + } + + public ColumnCapabilities getCapabilities(String column) + { + return columnCapabilities.get(column); + } + + public ConcurrentNavigableMap getSubMap(TimeAndDims start, TimeAndDims end) + { + return getFacts().subMap(start, end); + } + + @Override + public Iterator iterator() + { + return iterableWithPostAggregations(null).iterator(); + } + + public Iterable iterableWithPostAggregations(final List postAggs) + { + return new Iterable() + { + @Override + public Iterator iterator() + { + return Iterators.transform( + getFacts().entrySet().iterator(), + new Function, Row>() + { + @Override + public Row apply(final Map.Entry input) + { + final TimeAndDims timeAndDims = input.getKey(); + final int rowOffset = input.getValue(); + + String[][] theDims = timeAndDims.getDims(); + + Map theVals = Maps.newLinkedHashMap(); + for (int i = 0; i < theDims.length; ++i) { + String[] dim = theDims[i]; + if (dim != null && dim.length != 0) { + theVals.put(dimensions.get(i), dim.length == 1 ? dim[0] : Arrays.asList(dim)); + } + } + + AggregatorType[] aggs = getAggsForRow(rowOffset); + for (int i = 0; i < aggs.length; ++i) { + theVals.put(metrics[i].getName(), getAggVal(aggs[i], rowOffset, i)); + } + + if (postAggs != null) { + for (PostAggregator postAgg : postAggs) { + theVals.put(postAgg.getName(), postAgg.compute(theVals)); + } + } + + return new MapBasedRow(timeAndDims.getTimestamp(), theVals); + } + } + ); + } + }; + } + + class DimensionHolder + { + private final Map dimensions; + + DimensionHolder() + { + dimensions = Maps.newConcurrentMap(); + } + + DimDim add(String dimension) + { + DimDim holder = dimensions.get(dimension); + if (holder == null) { + holder = makeDimDim(dimension); + dimensions.put(dimension, holder); + } else { + throw new ISE("dimension[%s] already existed even though add() was called!?", dimension); + } + return holder; + } + + DimDim get(String dimension) + { + return dimensions.get(dimension); + } + } static interface DimDim { @@ -189,8 +717,7 @@ public interface IncrementalIndex extends Iterable, Closeable return Arrays.asList(input); } } - ) + - '}'; + ) + '}'; } } } diff --git a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexAdapter.java b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexAdapter.java index 77d0d9e0c62..73e452ed0ec 100644 --- a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexAdapter.java +++ b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexAdapter.java @@ -46,11 +46,11 @@ public class IncrementalIndexAdapter implements IndexableAdapter { private static final Logger log = new Logger(IncrementalIndexAdapter.class); private final Interval dataInterval; - private final IncrementalIndex index; + private final IncrementalIndex index; private final Map> invertedIndexes; public IncrementalIndexAdapter( - Interval dataInterval, IncrementalIndex index, BitmapFactory bitmapFactory + Interval dataInterval, IncrementalIndex index, BitmapFactory bitmapFactory ) { this.dataInterval = dataInterval; diff --git a/processing/src/main/java/io/druid/segment/incremental/OffheapIncrementalIndex.java b/processing/src/main/java/io/druid/segment/incremental/OffheapIncrementalIndex.java index 956e1815fbe..5644621bcc6 100644 --- a/processing/src/main/java/io/druid/segment/incremental/OffheapIncrementalIndex.java +++ b/processing/src/main/java/io/druid/segment/incremental/OffheapIncrementalIndex.java @@ -19,41 +19,14 @@ package io.druid.segment.incremental; - -import com.google.common.base.Function; import com.google.common.base.Throwables; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Iterators; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.metamx.common.IAE; import com.metamx.common.ISE; import io.druid.collections.ResourceHolder; import io.druid.collections.StupidPool; import io.druid.data.input.InputRow; -import io.druid.data.input.MapBasedRow; -import io.druid.data.input.Row; -import io.druid.data.input.impl.SpatialDimensionSchema; import io.druid.granularity.QueryGranularity; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.BufferAggregator; -import io.druid.query.aggregation.PostAggregator; -import io.druid.segment.ColumnSelectorFactory; -import io.druid.segment.DimensionSelector; -import io.druid.segment.FloatColumnSelector; -import io.druid.segment.LongColumnSelector; -import io.druid.segment.ObjectColumnSelector; -import io.druid.segment.column.Column; -import io.druid.segment.column.ColumnCapabilities; -import io.druid.segment.column.ColumnCapabilitiesImpl; -import io.druid.segment.column.ValueType; -import io.druid.segment.data.IndexedInts; -import io.druid.segment.serde.ComplexMetricExtractor; -import io.druid.segment.serde.ComplexMetricSerde; -import io.druid.segment.serde.ComplexMetrics; -import org.joda.time.DateTime; -import org.joda.time.Interval; import org.mapdb.BTreeKeySerializer; import org.mapdb.DB; import org.mapdb.DBMaker; @@ -65,273 +38,53 @@ import java.io.IOException; import java.io.Serializable; import java.lang.ref.WeakReference; import java.nio.ByteBuffer; -import java.util.ArrayList; import java.util.Arrays; import java.util.Comparator; -import java.util.Iterator; -import java.util.LinkedHashMap; -import java.util.List; import java.util.Map; import java.util.UUID; import java.util.WeakHashMap; import java.util.concurrent.ConcurrentNavigableMap; -import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicInteger; -public class OffheapIncrementalIndex implements IncrementalIndex +/** + */ +public class OffheapIncrementalIndex extends IncrementalIndex { - private final long minTimestamp; - private final QueryGranularity gran; - private final List> rowTransformers; - private final AggregatorFactory[] metrics; - private final Map metricIndexes; - private final Map metricTypes; - private final ImmutableList metricNames; - private final BufferAggregator[] aggs; - private final int[] aggPositionOffsets; - private final int totalAggSize; - private final LinkedHashMap dimensionOrder; - protected final CopyOnWriteArrayList dimensions; - private final DimensionHolder dimValues; - private final Map columnCapabilities; - private final ConcurrentNavigableMap facts; private final ResourceHolder bufferHolder; + private final DB db; private final DB factsDb; - private volatile AtomicInteger numEntries = new AtomicInteger(); - // This is modified on add() in a critical section. - private ThreadLocal in = new ThreadLocal<>(); + private final int[] aggPositionOffsets; + private final int totalAggSize; + private final ConcurrentNavigableMap facts; - /** - * Setting deserializeComplexMetrics to false is necessary for intermediate aggregation such as groupBy that - * should not deserialize input columns using ComplexMetricSerde for aggregators that return complex metrics. - * - * @param incrementalIndexSchema - * @param bufferPool - * @param deserializeComplexMetrics flag whether or not to call ComplexMetricExtractor.extractValue() on the input - * value for aggregators that return metrics other than float. - */ public OffheapIncrementalIndex( IncrementalIndexSchema incrementalIndexSchema, StupidPool bufferPool, - final boolean deserializeComplexMetrics + boolean deserializeComplexMetrics ) { - this.minTimestamp = incrementalIndexSchema.getMinTimestamp(); - this.gran = incrementalIndexSchema.getGran(); - this.metrics = incrementalIndexSchema.getMetrics(); - this.rowTransformers = Lists.newCopyOnWriteArrayList(); + super(incrementalIndexSchema, deserializeComplexMetrics); - final ImmutableList.Builder metricNamesBuilder = ImmutableList.builder(); - final ImmutableMap.Builder metricIndexesBuilder = ImmutableMap.builder(); - final ImmutableMap.Builder metricTypesBuilder = ImmutableMap.builder(); - this.aggs = new BufferAggregator[metrics.length]; + this.bufferHolder = bufferPool.take(); + + final AggregatorFactory[] metrics = incrementalIndexSchema.getMetrics(); this.aggPositionOffsets = new int[metrics.length]; + int currAggSize = 0; for (int i = 0; i < metrics.length; i++) { final AggregatorFactory agg = metrics[i]; - aggs[i] = agg.factorizeBuffered( - new ColumnSelectorFactory() - { - @Override - public LongColumnSelector makeLongColumnSelector(final String columnName) - { - if (columnName.equals(Column.TIME_COLUMN_NAME)) { - return new LongColumnSelector() - { - @Override - public long get() - { - return in.get().getTimestampFromEpoch(); - } - }; - } - return new LongColumnSelector() - { - @Override - public long get() - { - return in.get().getLongMetric(columnName); - } - }; - } - - @Override - public FloatColumnSelector makeFloatColumnSelector(final String columnName) - { - return new FloatColumnSelector() - { - @Override - public float get() - { - return in.get().getFloatMetric(columnName); - } - }; - } - - @Override - public ObjectColumnSelector makeObjectColumnSelector(final String column) - { - final String typeName = agg.getTypeName(); - - final ObjectColumnSelector rawColumnSelector = new ObjectColumnSelector() - { - @Override - public Class classOfObject() - { - return Object.class; - } - - @Override - public Object get() - { - return in.get().getRaw(column); - } - }; - - if (!deserializeComplexMetrics) { - return rawColumnSelector; - } else { - if (typeName.equals("float")) { - return rawColumnSelector; - } - - final ComplexMetricSerde serde = ComplexMetrics.getSerdeForType(typeName); - if (serde == null) { - throw new ISE("Don't know how to handle type[%s]", typeName); - } - - final ComplexMetricExtractor extractor = serde.getExtractor(); - return new ObjectColumnSelector() - { - @Override - public Class classOfObject() - { - return extractor.extractedClass(); - } - - @Override - public Object get() - { - return extractor.extractValue(in.get(), column); - } - }; - } - } - - @Override - public DimensionSelector makeDimensionSelector(final String dimension) - { - return new DimensionSelector() - { - @Override - public IndexedInts getRow() - { - final List dimensionValues = in.get().getDimension(dimension); - final ArrayList vals = Lists.newArrayList(); - if (dimensionValues != null) { - for (int i = 0; i < dimensionValues.size(); ++i) { - vals.add(i); - } - } - - return new IndexedInts() - { - @Override - public int size() - { - return vals.size(); - } - - @Override - public int get(int index) - { - return vals.get(index); - } - - @Override - public Iterator iterator() - { - return vals.iterator(); - } - }; - } - - @Override - public int getValueCardinality() - { - throw new UnsupportedOperationException("value cardinality is unknown in incremental index"); - } - - @Override - public String lookupName(int id) - { - return in.get().getDimension(dimension).get(id); - } - - @Override - public int lookupId(String name) - { - return in.get().getDimension(dimension).indexOf(name); - } - }; - } - } - ); aggPositionOffsets[i] = currAggSize; currAggSize += agg.getMaxIntermediateSize(); - final String metricName = metrics[i].getName(); - metricNamesBuilder.add(metricName); - metricIndexesBuilder.put(metricName, i); - metricTypesBuilder.put(metricName, metrics[i].getTypeName()); } - metricNames = metricNamesBuilder.build(); - metricIndexes = metricIndexesBuilder.build(); - metricTypes = metricTypesBuilder.build(); - this.totalAggSize = currAggSize; - this.dimensionOrder = Maps.newLinkedHashMap(); - this.dimensions = new CopyOnWriteArrayList<>(); - // This should really be more generic - List spatialDimensions = incrementalIndexSchema.getDimensionsSpec().getSpatialDimensions(); - if (!spatialDimensions.isEmpty()) { - this.rowTransformers.add(new SpatialDimensionRowTransformer(spatialDimensions)); - } - - this.columnCapabilities = Maps.newHashMap(); - for (Map.Entry entry : metricTypes.entrySet()) { - ValueType type; - if (entry.getValue().equalsIgnoreCase("float")) { - type = ValueType.FLOAT; - } else if (entry.getValue().equalsIgnoreCase("long")) { - type = ValueType.LONG; - } else { - type = ValueType.COMPLEX; - } - ColumnCapabilitiesImpl capabilities = new ColumnCapabilitiesImpl(); - capabilities.setType(type); - columnCapabilities.put(entry.getKey(), capabilities); - } - for (String dimension : dimensions) { - ColumnCapabilitiesImpl capabilities = new ColumnCapabilitiesImpl(); - capabilities.setType(ValueType.STRING); - columnCapabilities.put(dimension, capabilities); - } - for (SpatialDimensionSchema spatialDimension : spatialDimensions) { - ColumnCapabilitiesImpl capabilities = new ColumnCapabilitiesImpl(); - capabilities.setType(ValueType.STRING); - capabilities.setHasSpatialIndexes(true); - columnCapabilities.put(spatialDimension.getDimName(), capabilities); - } - this.bufferHolder = bufferPool.take(); - this.dimValues = new DimensionHolder(); final DBMaker dbMaker = DBMaker.newMemoryDirectDB() .transactionDisable() .asyncWriteEnable() .cacheSoftRefEnable(); - factsDb = dbMaker.make(); - db = dbMaker.make(); + this.factsDb = dbMaker.make(); + this.db = dbMaker.make(); final TimeAndDimsSerializer timeAndDimsSerializer = new TimeAndDimsSerializer(this); this.facts = factsDb.createTreeMap("__facts" + UUID.randomUUID()) .keySerializer(timeAndDimsSerializer) @@ -340,7 +93,6 @@ public class OffheapIncrementalIndex implements IncrementalIndex .make(); } - public OffheapIncrementalIndex( long minTimestamp, QueryGranularity gran, @@ -360,85 +112,45 @@ public class OffheapIncrementalIndex implements IncrementalIndex } @Override - public InputRow formatRow(InputRow row) + public ConcurrentNavigableMap getFacts() { - for (Function rowTransformer : rowTransformers) { - row = rowTransformer.apply(row); - } - - if (row == null) { - throw new IAE("Row is null? How can this be?!"); - } - return row; + return facts; } - /** - * Adds a new row. The row might correspond with another row that already exists, in which case this will - * update that row instead of inserting a new one. - *

- *

- * Calls to add() are thread safe. - *

- * - * @param row the row of data to add - * - * @return the number of rows in the data set after adding the InputRow - */ @Override - public int add(InputRow row) + protected DimDim makeDimDim(String dimension) { - row = formatRow(row); - if (row.getTimestampFromEpoch() < minTimestamp) { - throw new IAE("Cannot add row[%s] because it is below the minTimestamp[%s]", row, new DateTime(minTimestamp)); + return new OffheapDimDim(dimension); + } + + @Override + protected BufferAggregator[] initAggs( + AggregatorFactory[] metrics, + ThreadLocal in, + boolean deserializeComplexMetrics + ) + { + BufferAggregator[] aggs = new BufferAggregator[metrics.length]; + for (int i = 0; i < metrics.length; i++) { + final AggregatorFactory agg = metrics[i]; + aggs[i] = agg.factorizeBuffered( + makeColumnSelectorFactory(agg, in, deserializeComplexMetrics) + ); } + return aggs; + } - final List rowDimensions = row.getDimensions(); - - String[][] dims; - List overflow = null; - synchronized (dimensionOrder) { - dims = new String[dimensionOrder.size()][]; - for (String dimension : rowDimensions) { - List dimensionValues = row.getDimension(dimension); - - // Set column capabilities as data is coming in - ColumnCapabilitiesImpl capabilities = columnCapabilities.get(dimension); - if (capabilities == null) { - capabilities = new ColumnCapabilitiesImpl(); - capabilities.setType(ValueType.STRING); - columnCapabilities.put(dimension, capabilities); - } - if (dimensionValues.size() > 1) { - capabilities.setHasMultipleValues(true); - } - - Integer index = dimensionOrder.get(dimension); - if (index == null) { - dimensionOrder.put(dimension, dimensionOrder.size()); - dimensions.add(dimension); - - if (overflow == null) { - overflow = Lists.newArrayList(); - } - overflow.add(getDimVals(dimValues.add(dimension), dimensionValues)); - } else { - dims[index] = getDimVals(dimValues.get(dimension), dimensionValues); - } - } - } - - - if (overflow != null) { - // Merge overflow and non-overflow - String[][] newDims = new String[dims.length + overflow.size()][]; - System.arraycopy(dims, 0, newDims, 0, dims.length); - for (int i = 0; i < overflow.size(); ++i) { - newDims[dims.length + i] = overflow.get(i); - } - dims = newDims; - } - - final TimeAndDims key = new TimeAndDims(Math.max(gran.truncate(row.getTimestampFromEpoch()), minTimestamp), dims); + @Override + protected Integer addToFacts( + AggregatorFactory[] metrics, + boolean deserializeComplexMetrics, + InputRow row, + AtomicInteger numEntries, + TimeAndDims key, + ThreadLocal in + ) + { + final BufferAggregator[] aggs = getAggs(); Integer rowOffset; synchronized (this) { rowOffset = totalAggSize * numEntries.get(); @@ -467,195 +179,33 @@ public class OffheapIncrementalIndex implements IncrementalIndex } @Override - public boolean isEmpty() + protected BufferAggregator[] getAggsForRow(int rowOffset) { - return numEntries.get() == 0; + return getAggs(); } @Override - public int size() + protected Object getAggVal(BufferAggregator agg, int rowOffset, int aggPosition) { - return numEntries.get(); - } - - private long getMinTimeMillis() - { - return facts.firstKey().getTimestamp(); - } - - private long getMaxTimeMillis() - { - return facts.lastKey().getTimestamp(); - } - - private String[] getDimVals(final DimDim dimLookup, final List dimValues) - { - final String[] retVal = new String[dimValues.size()]; - - int count = 0; - for (String dimValue : dimValues) { - String canonicalDimValue = dimLookup.get(dimValue); - if (!dimLookup.contains(canonicalDimValue)) { - dimLookup.add(dimValue); - } - retVal[count] = canonicalDimValue; - count++; - } - Arrays.sort(retVal); - - return retVal; - } - - @Override - public AggregatorFactory[] getMetricAggs() - { - return metrics; - } - - @Override - public List getDimensions() - { - return dimensions; - } - - @Override - public String getMetricType(String metric) - { - return metricTypes.get(metric); - } - - @Override - public Interval getInterval() - { - return new Interval(minTimestamp, isEmpty() ? minTimestamp : gran.next(getMaxTimeMillis())); - } - - @Override - public DateTime getMinTime() - { - return isEmpty() ? null : new DateTime(getMinTimeMillis()); - } - - @Override - public DateTime getMaxTime() - { - return isEmpty() ? null : new DateTime(getMaxTimeMillis()); - } - - @Override - public DimDim getDimension(String dimension) - { - return isEmpty() ? null : dimValues.get(dimension); - } - - @Override - public Integer getDimensionIndex(String dimension) - { - return dimensionOrder.get(dimension); - } - - @Override - public List getMetricNames() - { - return metricNames; - } - - @Override - public Integer getMetricIndex(String metricName) - { - return metricIndexes.get(metricName); - } - - private int getMetricPosition(int rowOffset, int metricIndex) - { - return rowOffset + aggPositionOffsets[metricIndex]; + return agg.get(bufferHolder.get(), getMetricPosition(rowOffset, aggPosition)); } @Override public float getMetricFloatValue(int rowOffset, int aggOffset) { - return aggs[aggOffset].getFloat(bufferHolder.get(), getMetricPosition(rowOffset, aggOffset)); + return getAggs()[aggOffset].getFloat(bufferHolder.get(), getMetricPosition(rowOffset, aggOffset)); } @Override public long getMetricLongValue(int rowOffset, int aggOffset) { - return aggs[aggOffset].getLong(bufferHolder.get(), getMetricPosition(rowOffset, aggOffset)); + return getAggs()[aggOffset].getLong(bufferHolder.get(), getMetricPosition(rowOffset, aggOffset)); } @Override public Object getMetricObjectValue(int rowOffset, int aggOffset) { - return aggs[aggOffset].get(bufferHolder.get(), getMetricPosition(rowOffset, aggOffset)); - } - - @Override - public ColumnCapabilities getCapabilities(String column) - { - return columnCapabilities.get(column); - } - - @Override - public ConcurrentNavigableMap getFacts() - { - return facts; - } - - @Override - public ConcurrentNavigableMap getSubMap(TimeAndDims start, TimeAndDims end) - { - return facts.subMap(start, end); - } - - @Override - public Iterator iterator() - { - return iterableWithPostAggregations(null).iterator(); - } - - public Iterable iterableWithPostAggregations(final List postAggs) - { - return new Iterable() - { - @Override - public Iterator iterator() - { - return Iterators.transform( - facts.entrySet().iterator(), - new Function, Row>() - { - @Override - public Row apply(final Map.Entry input) - { - final TimeAndDims timeAndDims = input.getKey(); - final int rowOffset = input.getValue(); - - String[][] theDims = timeAndDims.getDims(); - - Map theVals = Maps.newLinkedHashMap(); - for (int i = 0; i < theDims.length; ++i) { - String[] dim = theDims[i]; - if (dim != null && dim.length != 0) { - theVals.put(dimensions.get(i), dim.length == 1 ? dim[0] : Arrays.asList(dim)); - } - } - - for (int i = 0; i < aggs.length; ++i) { - theVals.put(metrics[i].getName(), aggs[i].get(bufferHolder.get(), getMetricPosition(rowOffset, i))); - } - - if (postAggs != null) { - for (PostAggregator postAgg : postAggs) { - theVals.put(postAgg.getName(), postAgg.compute(theVals)); - } - } - - return new MapBasedRow(timeAndDims.getTimestamp(), theVals); - } - } - ); - } - }; + return getAggs()[aggOffset].get(bufferHolder.get(), getMetricPosition(rowOffset, aggOffset)); } @Override @@ -669,34 +219,18 @@ public class OffheapIncrementalIndex implements IncrementalIndex } } - class DimensionHolder + private int getMetricPosition(int rowOffset, int metricIndex) { - private final Map dimensions; - - DimensionHolder() - { - dimensions = Maps.newConcurrentMap(); - } - - DimDim add(String dimension) - { - DimDim holder = dimensions.get(dimension); - if (holder == null) { - holder = new OffheapDimDim(dimension); - dimensions.put(dimension, holder); - } else { - throw new ISE("dimension[%s] already existed even though add() was called!?", dimension); - } - return holder; - } - - DimDim get(String dimension) - { - return dimensions.get(dimension); - } + return rowOffset + aggPositionOffsets[metricIndex]; } - public static class TimeAndDimsSerializer extends BTreeKeySerializer implements Serializable + private DimDim getDimDim(int dimIndex) + { + return getDimValues().get(getDimensions().get(dimIndex)); + } + + // MapDB forces serializers to implement serializable, which sucks + private static class TimeAndDimsSerializer extends BTreeKeySerializer implements Serializable { private final TimeAndDimsComparator comparator; private final transient OffheapIncrementalIndex incrementalIndex; @@ -719,7 +253,7 @@ public class OffheapIncrementalIndex implements IncrementalIndex if (dims == null) { out.write(-1); } else { - DimDim dimDim = incrementalIndex.getDimension(incrementalIndex.dimensions.get(index)); + DimDim dimDim = incrementalIndex.getDimDim(index); out.writeInt(dims.length); for (String value : dims) { out.writeInt(dimDim.getId(value)); @@ -740,7 +274,7 @@ public class OffheapIncrementalIndex implements IncrementalIndex for (int k = 0; k < dims.length; k++) { int len = in.readInt(); if (len != -1) { - DimDim dimDim = incrementalIndex.getDimension(incrementalIndex.dimensions.get(k)); + DimDim dimDim = incrementalIndex.getDimDim(k); String[] col = new String[len]; for (int l = 0; l < col.length; l++) { col[l] = dimDim.get(dimDim.getValue(in.readInt())); @@ -760,7 +294,7 @@ public class OffheapIncrementalIndex implements IncrementalIndex } } - public static class TimeAndDimsComparator implements Comparator, Serializable + private static class TimeAndDimsComparator implements Comparator, Serializable { @Override public int compare(Object o1, Object o2) @@ -773,8 +307,8 @@ public class OffheapIncrementalIndex implements IncrementalIndex { private final Map falseIds; private final Map falseIdsReverse; - private final WeakHashMap> cache = - new WeakHashMap(); + private final WeakHashMap> cache = new WeakHashMap(); + private volatile String[] sortedVals = null; // size on MapDB is slow so maintain a count here private volatile int size = 0; @@ -874,5 +408,4 @@ public class OffheapIncrementalIndex implements IncrementalIndex return s1.equals(s2); } } - } diff --git a/processing/src/main/java/io/druid/segment/incremental/OnheapIncrementalIndex.java b/processing/src/main/java/io/druid/segment/incremental/OnheapIncrementalIndex.java index 47195073031..29483942ea8 100644 --- a/processing/src/main/java/io/druid/segment/incremental/OnheapIncrementalIndex.java +++ b/processing/src/main/java/io/druid/segment/incremental/OnheapIncrementalIndex.java @@ -19,144 +19,51 @@ package io.druid.segment.incremental; -import com.google.common.base.Function; import com.google.common.collect.BiMap; import com.google.common.collect.HashBiMap; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Iterators; import com.google.common.collect.Lists; import com.google.common.collect.Maps; -import com.metamx.common.IAE; import com.metamx.common.ISE; import io.druid.data.input.InputRow; -import io.druid.data.input.MapBasedRow; -import io.druid.data.input.Row; -import io.druid.data.input.impl.SpatialDimensionSchema; import io.druid.granularity.QueryGranularity; import io.druid.query.aggregation.Aggregator; import io.druid.query.aggregation.AggregatorFactory; -import io.druid.query.aggregation.PostAggregator; -import io.druid.segment.ColumnSelectorFactory; -import io.druid.segment.DimensionSelector; -import io.druid.segment.FloatColumnSelector; -import io.druid.segment.LongColumnSelector; -import io.druid.segment.ObjectColumnSelector; -import io.druid.segment.column.Column; -import io.druid.segment.column.ColumnCapabilities; -import io.druid.segment.column.ColumnCapabilitiesImpl; -import io.druid.segment.column.ValueType; -import io.druid.segment.data.IndexedInts; -import io.druid.segment.serde.ComplexMetricExtractor; -import io.druid.segment.serde.ComplexMetricSerde; -import io.druid.segment.serde.ComplexMetrics; -import org.joda.time.DateTime; -import org.joda.time.Interval; -import java.util.ArrayList; import java.util.Arrays; -import java.util.Iterator; -import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentNavigableMap; import java.util.concurrent.ConcurrentSkipListMap; -import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicInteger; /** */ -public class OnheapIncrementalIndex implements IncrementalIndex +public class OnheapIncrementalIndex extends IncrementalIndex { - private final long minTimestamp; - private final QueryGranularity gran; - private final List> rowTransformers; - private final AggregatorFactory[] metrics; - private final Map metricIndexes; - private final Map metricTypes; - private final ImmutableList metricNames; - private final LinkedHashMap dimensionOrder; - protected final CopyOnWriteArrayList dimensions; - private final DimensionHolder dimValues; - private final Map columnCapabilities; private final ConcurrentNavigableMap facts; - private final List aggList; - private volatile AtomicInteger numEntries = new AtomicInteger(); - // This is modified on add() in a critical section. - private ThreadLocal in = new ThreadLocal<>(); - private final boolean deserializeComplexMetrics; + private final List aggList = Lists.newArrayList(); + + public OnheapIncrementalIndex(IncrementalIndexSchema incrementalIndexSchema, boolean deserializeComplexMetrics) + { + super(incrementalIndexSchema, deserializeComplexMetrics); + this.facts = new ConcurrentSkipListMap<>(); + } - /** - * Setting deserializeComplexMetrics to false is necessary for intermediate aggregation such as groupBy that - * should not deserialize input columns using ComplexMetricSerde for aggregators that return complex metrics. - * - * @param incrementalIndexSchema - * @param deserializeComplexMetrics flag whether or not to call ComplexMetricExtractor.extractValue() on the input - * value for aggregators that return metrics other than float. - */ public OnheapIncrementalIndex( - IncrementalIndexSchema incrementalIndexSchema, - final boolean deserializeComplexMetrics + long minTimestamp, + QueryGranularity gran, + final AggregatorFactory[] metrics, + boolean deserializeComplexMetrics ) { - this.minTimestamp = incrementalIndexSchema.getMinTimestamp(); - this.gran = incrementalIndexSchema.getGran(); - this.metrics = incrementalIndexSchema.getMetrics(); - this.rowTransformers = Lists.newCopyOnWriteArrayList(); - - final ImmutableList.Builder metricNamesBuilder = ImmutableList.builder(); - final ImmutableMap.Builder metricIndexesBuilder = ImmutableMap.builder(); - final ImmutableMap.Builder metricTypesBuilder = ImmutableMap.builder(); - this.aggList = Lists.newArrayList(); - - for (int i = 0; i < metrics.length; i++) { - final String metricName = metrics[i].getName(); - metricNamesBuilder.add(metricName); - metricIndexesBuilder.put(metricName, i); - metricTypesBuilder.put(metricName, metrics[i].getTypeName()); - } - - metricNames = metricNamesBuilder.build(); - metricIndexes = metricIndexesBuilder.build(); - metricTypes = metricTypesBuilder.build(); - - this.dimensionOrder = Maps.newLinkedHashMap(); - this.dimensions = new CopyOnWriteArrayList<>(); - // This should really be more generic - List spatialDimensions = incrementalIndexSchema.getDimensionsSpec().getSpatialDimensions(); - if (!spatialDimensions.isEmpty()) { - this.rowTransformers.add(new SpatialDimensionRowTransformer(spatialDimensions)); - } - - this.columnCapabilities = Maps.newHashMap(); - for (Map.Entry entry : metricTypes.entrySet()) { - ValueType type; - if (entry.getValue().equalsIgnoreCase("float")) { - type = ValueType.FLOAT; - } else if (entry.getValue().equalsIgnoreCase("long")) { - type = ValueType.LONG; - } else { - type = ValueType.COMPLEX; - } - ColumnCapabilitiesImpl capabilities = new ColumnCapabilitiesImpl(); - capabilities.setType(type); - columnCapabilities.put(entry.getKey(), capabilities); - } - for (String dimension : dimensions) { - ColumnCapabilitiesImpl capabilities = new ColumnCapabilitiesImpl(); - capabilities.setType(ValueType.STRING); - columnCapabilities.put(dimension, capabilities); - } - for (SpatialDimensionSchema spatialDimension : spatialDimensions) { - ColumnCapabilitiesImpl capabilities = new ColumnCapabilitiesImpl(); - capabilities.setType(ValueType.STRING); - capabilities.setHasSpatialIndexes(true); - columnCapabilities.put(spatialDimension.getDimName(), capabilities); - } - this.dimValues = new DimensionHolder(); - this.facts = new ConcurrentSkipListMap<>(); - this.deserializeComplexMetrics = deserializeComplexMetrics; + this( + new IncrementalIndexSchema.Builder().withMinTimestamp(minTimestamp) + .withQueryGranularity(gran) + .withMetrics(metrics) + .build(), + deserializeComplexMetrics + ); } public OnheapIncrementalIndex( @@ -181,102 +88,36 @@ public class OnheapIncrementalIndex implements IncrementalIndex this(incrementalIndexSchema, true); } - public OnheapIncrementalIndex( - long minTimestamp, - QueryGranularity gran, - final AggregatorFactory[] metrics, - boolean deserializeComplexMetrics + @Override + public ConcurrentNavigableMap getFacts() + { + return facts; + } + + @Override + protected DimDim makeDimDim(String dimension) + { + return new OnHeapDimDim(); + } + + @Override + protected Aggregator[] initAggs( + AggregatorFactory[] metrics, ThreadLocal in, boolean deserializeComplexMetrics ) { - this( - new IncrementalIndexSchema.Builder().withMinTimestamp(minTimestamp) - .withQueryGranularity(gran) - .withMetrics(metrics) - .build(), - deserializeComplexMetrics - ); + return new Aggregator[metrics.length]; } @Override - public InputRow formatRow(InputRow row) + protected Integer addToFacts( + AggregatorFactory[] metrics, + boolean deserializeComplexMetrics, + InputRow row, + AtomicInteger numEntries, + TimeAndDims key, + ThreadLocal in + ) { - for (Function rowTransformer : rowTransformers) { - row = rowTransformer.apply(row); - } - - if (row == null) { - throw new IAE("Row is null? How can this be?!"); - } - return row; - } - - /** - * Adds a new row. The row might correspond with another row that already exists, in which case this will - * update that row instead of inserting a new one. - *

- *

- * Calls to add() are thread safe. - *

- * - * @param row the row of data to add - * - * @return the number of rows in the data set after adding the InputRow - */ - @Override - public int add(InputRow row) - { - row = formatRow(row); - if (row.getTimestampFromEpoch() < minTimestamp) { - throw new IAE("Cannot add row[%s] because it is below the minTimestamp[%s]", row, new DateTime(minTimestamp)); - } - - final List rowDimensions = row.getDimensions(); - - String[][] dims; - List overflow = null; - synchronized (dimensionOrder) { - dims = new String[dimensionOrder.size()][]; - for (String dimension : rowDimensions) { - List dimensionValues = row.getDimension(dimension); - - // Set column capabilities as data is coming in - ColumnCapabilitiesImpl capabilities = columnCapabilities.get(dimension); - if (capabilities == null) { - capabilities = new ColumnCapabilitiesImpl(); - capabilities.setType(ValueType.STRING); - columnCapabilities.put(dimension, capabilities); - } - if (dimensionValues.size() > 1) { - capabilities.setHasMultipleValues(true); - } - - Integer index = dimensionOrder.get(dimension); - if (index == null) { - dimensionOrder.put(dimension, dimensionOrder.size()); - dimensions.add(dimension); - - if (overflow == null) { - overflow = Lists.newArrayList(); - } - overflow.add(getDimVals(dimValues.add(dimension), dimensionValues)); - } else { - dims[index] = getDimVals(dimValues.get(dimension), dimensionValues); - } - } - } - - - if (overflow != null) { - // Merge overflow and non-overflow - String[][] newDims = new String[dims.length + overflow.size()][]; - System.arraycopy(dims, 0, newDims, 0, dims.length); - for (int i = 0; i < overflow.size(); ++i) { - newDims[dims.length + i] = overflow.get(i); - } - dims = newDims; - } - - final TimeAndDims key = new TimeAndDims(Math.max(gran.truncate(row.getTimestampFromEpoch()), minTimestamp), dims); Integer rowOffset; synchronized (this) { rowOffset = numEntries.get(); @@ -288,160 +129,17 @@ public class OnheapIncrementalIndex implements IncrementalIndex for (int i = 0; i < metrics.length; i++) { final AggregatorFactory agg = metrics[i]; aggs[i] = agg.factorize( - new ColumnSelectorFactory() - { - @Override - public LongColumnSelector makeLongColumnSelector(final String columnName) - { - if (columnName.equals(Column.TIME_COLUMN_NAME)) { - return new LongColumnSelector() - { - @Override - public long get() - { - return in.get().getTimestampFromEpoch(); - } - }; - } - return new LongColumnSelector() - { - @Override - public long get() - { - return in.get().getLongMetric(columnName); - } - }; - } - - @Override - public FloatColumnSelector makeFloatColumnSelector(final String columnName) - { - return new FloatColumnSelector() - { - @Override - public float get() - { - return in.get().getFloatMetric(columnName); - } - }; - } - - @Override - public ObjectColumnSelector makeObjectColumnSelector(final String column) - { - final String typeName = agg.getTypeName(); - - final ObjectColumnSelector rawColumnSelector = new ObjectColumnSelector() - { - @Override - public Class classOfObject() - { - return Object.class; - } - - @Override - public Object get() - { - return in.get().getRaw(column); - } - }; - - if (!deserializeComplexMetrics) { - return rawColumnSelector; - } else { - if (typeName.equals("float")) { - return rawColumnSelector; - } - - final ComplexMetricSerde serde = ComplexMetrics.getSerdeForType(typeName); - if (serde == null) { - throw new ISE("Don't know how to handle type[%s]", typeName); - } - - final ComplexMetricExtractor extractor = serde.getExtractor(); - return new ObjectColumnSelector() - { - @Override - public Class classOfObject() - { - return extractor.extractedClass(); - } - - @Override - public Object get() - { - return extractor.extractValue(in.get(), column); - } - }; - } - } - - @Override - public DimensionSelector makeDimensionSelector(final String dimension) - { - return new DimensionSelector() - { - @Override - public IndexedInts getRow() - { - final List dimensionValues = in.get().getDimension(dimension); - final ArrayList vals = Lists.newArrayList(); - if (dimensionValues != null) { - for (int i = 0; i < dimensionValues.size(); ++i) { - vals.add(i); - } - } - - return new IndexedInts() - { - @Override - public int size() - { - return vals.size(); - } - - @Override - public int get(int index) - { - return vals.get(index); - } - - @Override - public Iterator iterator() - { - return vals.iterator(); - } - }; - } - - @Override - public int getValueCardinality() - { - throw new UnsupportedOperationException("value cardinality is unknown in incremental index"); - } - - @Override - public String lookupName(int id) - { - return in.get().getDimension(dimension).get(id); - } - - @Override - public int lookupId(String name) - { - return in.get().getDimension(dimension).indexOf(name); - } - }; - } - } + makeColumnSelectorFactory(agg, in, deserializeComplexMetrics) ); } aggList.add(aggs); numEntries.incrementAndGet(); } } + in.set(row); - Aggregator[] aggs = aggList.get(rowOffset); + + final Aggregator[] aggs = aggList.get(rowOffset); for (int i = 0; i < aggs.length; i++) { synchronized (aggs[i]) { aggs[i].aggregate(); @@ -452,15 +150,15 @@ public class OnheapIncrementalIndex implements IncrementalIndex } @Override - public boolean isEmpty() + protected Aggregator[] getAggsForRow(int rowOffset) { - return numEntries.get() == 0; + return aggList.get(rowOffset); } @Override - public int size() + protected Object getAggVal(Aggregator agg, int rowOffset, int aggPosition) { - return numEntries.get(); + return agg.get(); } @Override @@ -481,196 +179,14 @@ public class OnheapIncrementalIndex implements IncrementalIndex return aggList.get(rowOffset)[aggOffset].get(); } - private long getMinTimeMillis() - { - return facts.firstKey().getTimestamp(); - } - - private long getMaxTimeMillis() - { - return facts.lastKey().getTimestamp(); - } - - private String[] getDimVals(final DimDim dimLookup, final List dimValues) - { - final String[] retVal = new String[dimValues.size()]; - - int count = 0; - for (String dimValue : dimValues) { - String canonicalDimValue = dimLookup.get(dimValue); - if (!dimLookup.contains(canonicalDimValue)) { - dimLookup.add(dimValue); - } - retVal[count] = canonicalDimValue; - count++; - } - Arrays.sort(retVal); - - return retVal; - } - - @Override - public AggregatorFactory[] getMetricAggs() - { - return metrics; - } - - @Override - public List getDimensions() - { - return dimensions; - } - - @Override - public String getMetricType(String metric) - { - return metricTypes.get(metric); - } - - public Interval getInterval() - { - return new Interval(minTimestamp, isEmpty() ? minTimestamp : gran.next(getMaxTimeMillis())); - } - - public DateTime getMinTime() - { - return isEmpty() ? null : new DateTime(getMinTimeMillis()); - } - - public DateTime getMaxTime() - { - return isEmpty() ? null : new DateTime(getMaxTimeMillis()); - } - - public DimDim getDimension(String dimension) - { - return isEmpty() ? null : dimValues.get(dimension); - } - - public Integer getDimensionIndex(String dimension) - { - return dimensionOrder.get(dimension); - } - - public List getMetricNames() - { - return metricNames; - } - - public Integer getMetricIndex(String metricName) - { - return metricIndexes.get(metricName); - } - - public ColumnCapabilities getCapabilities(String column) - { - return columnCapabilities.get(column); - } - - public ConcurrentNavigableMap getFacts() - { - return facts; - } - - public ConcurrentNavigableMap getSubMap(TimeAndDims start, TimeAndDims end) - { - return facts.subMap(start, end); - } - - @Override - public Iterator iterator() - { - return iterableWithPostAggregations(null).iterator(); - } - - @Override - public Iterable iterableWithPostAggregations(final List postAggs) - { - return new Iterable() - { - @Override - public Iterator iterator() - { - return Iterators.transform( - facts.entrySet().iterator(), - new Function, Row>() - { - @Override - public Row apply(final Map.Entry input) - { - final TimeAndDims timeAndDims = input.getKey(); - final int rowOffset = input.getValue(); - - String[][] theDims = timeAndDims.getDims(); - - Map theVals = Maps.newLinkedHashMap(); - for (int i = 0; i < theDims.length; ++i) { - String[] dim = theDims[i]; - if (dim != null && dim.length != 0) { - theVals.put(dimensions.get(i), dim.length == 1 ? dim[0] : Arrays.asList(dim)); - } - } - Aggregator[] aggs = aggList.get(rowOffset); - for (int i = 0; i < aggs.length; ++i) { - theVals.put(metrics[i].getName(), aggs[i].get()); - } - - if (postAggs != null) { - for (PostAggregator postAgg : postAggs) { - theVals.put(postAgg.getName(), postAgg.compute(theVals)); - } - } - - return new MapBasedRow(timeAndDims.getTimestamp(), theVals); - } - } - ); - } - }; - } - - @Override - public void close() - { - // Nothing to close - } - - class DimensionHolder - { - private final Map dimensions; - - DimensionHolder() - { - dimensions = Maps.newConcurrentMap(); - } - - DimDim add(String dimension) - { - DimDim holder = dimensions.get(dimension); - if (holder == null) { - holder = new DimDimImpl(); - dimensions.put(dimension, holder); - } else { - throw new ISE("dimension[%s] already existed even though add() was called!?", dimension); - } - return holder; - } - - DimDim get(String dimension) - { - return dimensions.get(dimension); - } - } - - private static class DimDimImpl implements DimDim + private static class OnHeapDimDim implements DimDim { private final Map falseIds; private final Map falseIdsReverse; private volatile String[] sortedVals = null; final ConcurrentMap poorMansInterning = Maps.newConcurrentMap(); - - public DimDimImpl() + public OnHeapDimDim() { BiMap biMap = Maps.synchronizedBiMap(HashBiMap.create()); falseIds = biMap; diff --git a/processing/src/test/java/io/druid/segment/IndexMergerTest.java b/processing/src/test/java/io/druid/segment/IndexMergerTest.java index b8630cf07f7..1be4343b389 100644 --- a/processing/src/test/java/io/druid/segment/IndexMergerTest.java +++ b/processing/src/test/java/io/druid/segment/IndexMergerTest.java @@ -27,6 +27,7 @@ import io.druid.data.input.MapBasedInputRow; import io.druid.granularity.QueryGranularity; import io.druid.query.TestQueryRunners; import io.druid.query.aggregation.AggregatorFactory; +import io.druid.query.aggregation.CountAggregatorFactory; import io.druid.segment.column.Column; import io.druid.segment.data.IncrementalIndexTest; import io.druid.segment.incremental.IncrementalIndex; @@ -60,7 +61,7 @@ public class IndexMergerTest Assert.assertEquals(2, index.getColumn(Column.TIME_COLUMN_NAME).getLength()); Assert.assertEquals(Arrays.asList("dim1", "dim2"), Lists.newArrayList(index.getAvailableDimensions())); - Assert.assertEquals(2, index.getColumnNames().size()); + Assert.assertEquals(3, index.getColumnNames().size()); } finally { tempDir.delete(); @@ -74,7 +75,7 @@ public class IndexMergerTest IncrementalIndex toPersist1 = IncrementalIndexTest.createCaseInsensitiveIndex(true); IncrementalIndexTest.populateIndex(timestamp, toPersist1); - IncrementalIndex toPersist2 = new OnheapIncrementalIndex(0L, QueryGranularity.NONE, new AggregatorFactory[]{}); + IncrementalIndex toPersist2 = new OnheapIncrementalIndex(0L, QueryGranularity.NONE, new AggregatorFactory[]{new CountAggregatorFactory("count")}); toPersist2.add( new MapBasedInputRow( @@ -100,25 +101,25 @@ public class IndexMergerTest Assert.assertEquals(2, index1.getColumn(Column.TIME_COLUMN_NAME).getLength()); Assert.assertEquals(Arrays.asList("dim1", "dim2"), Lists.newArrayList(index1.getAvailableDimensions())); - Assert.assertEquals(2, index1.getColumnNames().size()); + Assert.assertEquals(3, index1.getColumnNames().size()); QueryableIndex index2 = IndexIO.loadIndex(IndexMerger.persist(toPersist2, tempDir2)); Assert.assertEquals(2, index2.getColumn(Column.TIME_COLUMN_NAME).getLength()); Assert.assertEquals(Arrays.asList("dim1", "dim2"), Lists.newArrayList(index2.getAvailableDimensions())); - Assert.assertEquals(2, index2.getColumnNames().size()); + Assert.assertEquals(3, index2.getColumnNames().size()); QueryableIndex merged = IndexIO.loadIndex( IndexMerger.mergeQueryableIndex( Arrays.asList(index1, index2), - new AggregatorFactory[]{}, + new AggregatorFactory[]{new CountAggregatorFactory("count")}, mergedDir ) ); Assert.assertEquals(3, merged.getColumn(Column.TIME_COLUMN_NAME).getLength()); Assert.assertEquals(Arrays.asList("dim1", "dim2"), Lists.newArrayList(merged.getAvailableDimensions())); - Assert.assertEquals(2, merged.getColumnNames().size()); + Assert.assertEquals(3, merged.getColumnNames().size()); } finally { FileUtils.deleteQuietly(tempDir1); diff --git a/processing/src/test/java/io/druid/segment/TestIndex.java b/processing/src/test/java/io/druid/segment/TestIndex.java index 75bf9a4502b..88f1f6a4fb6 100644 --- a/processing/src/test/java/io/druid/segment/TestIndex.java +++ b/processing/src/test/java/io/druid/segment/TestIndex.java @@ -157,10 +157,10 @@ public class TestIndex final URL resource = TestIndex.class.getClassLoader().getResource(resourceFilename); log.info("Realtime loading index file[%s]", resource); final IncrementalIndexSchema schema = new IncrementalIndexSchema.Builder() - .withMinTimestamp(new DateTime("2011-01-12T00:00:00.000Z").getMillis()) - .withQueryGranularity(QueryGranularity.NONE) - .withMetrics(METRIC_AGGS) - .build(); + .withMinTimestamp(new DateTime("2011-01-12T00:00:00.000Z").getMillis()) + .withQueryGranularity(QueryGranularity.NONE) + .withMetrics(METRIC_AGGS) + .build(); final IncrementalIndex retVal; if (useOffheap) { retVal = new OffheapIncrementalIndex( diff --git a/processing/src/test/java/io/druid/segment/data/IncrementalIndexTest.java b/processing/src/test/java/io/druid/segment/data/IncrementalIndexTest.java index 8e8fa3baa9b..5e274a5dded 100644 --- a/processing/src/test/java/io/druid/segment/data/IncrementalIndexTest.java +++ b/processing/src/test/java/io/druid/segment/data/IncrementalIndexTest.java @@ -25,6 +25,7 @@ import io.druid.data.input.Row; import io.druid.granularity.QueryGranularity; import io.druid.query.TestQueryRunners; import io.druid.query.aggregation.AggregatorFactory; +import io.druid.query.aggregation.CountAggregatorFactory; import io.druid.segment.incremental.IncrementalIndex; import io.druid.segment.incremental.OffheapIncrementalIndex; import io.druid.segment.incremental.OnheapIncrementalIndex; @@ -99,13 +100,13 @@ public class IncrementalIndexTest return new OffheapIncrementalIndex( 0L, QueryGranularity.NONE, - new AggregatorFactory[]{}, + new AggregatorFactory[]{new CountAggregatorFactory("count")}, TestQueryRunners.pool, true ); } else { return new OnheapIncrementalIndex( - 0L, QueryGranularity.NONE, new AggregatorFactory[]{} + 0L, QueryGranularity.NONE, new AggregatorFactory[]{new CountAggregatorFactory("count")} ); } }