Merge pull request #2085 from navis/refactor-incremental-index2

Replace string[] with int[] for dimensions
This commit is contained in:
Fangjin Yang 2016-02-02 23:21:28 -08:00
commit 2b84251a7d
11 changed files with 557 additions and 910 deletions

View File

@ -41,8 +41,6 @@ public interface IndexableAdapter
Iterable<Rowboat> getRows();
IndexedInts getBitmapIndex(String dimension, String value);
IndexedInts getBitmapIndex(String dimension, int dictId);
String getMetricType(String metric);

View File

@ -19,6 +19,7 @@
package io.druid.segment;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.Lists;
@ -294,8 +295,8 @@ public class QueryableIndexIndexableAdapter implements IndexableAdapter
};
}
@Override
public IndexedInts getBitmapIndex(String dimension, String value)
@VisibleForTesting
IndexedInts getBitmapIndex(String dimension, String value)
{
final Column column = input.getColumn(dimension);

View File

@ -75,12 +75,6 @@ public class RowboatFilteringIndexAdapter implements IndexableAdapter
return Iterables.filter(baseAdapter.getRows(), filter);
}
@Override
public IndexedInts getBitmapIndex(String dimension, String value)
{
return baseAdapter.getBitmapIndex(dimension, value);
}
@Override
public String getMetricType(String metric)
{

View File

@ -19,6 +19,7 @@
package io.druid.segment.incremental;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.Strings;
import com.google.common.base.Supplier;
@ -58,10 +59,13 @@ import org.joda.time.DateTime;
import org.joda.time.Interval;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@ -268,6 +272,7 @@ public abstract class IncrementalIndex<AggregatorType> implements Iterable<Row>,
private final Map<String, MetricDesc> metricDescs;
private final Map<String, DimensionDesc> dimensionDescs;
private final Map<String, ColumnCapabilitiesImpl> columnCapabilities;
private final List<DimDim> dimValues;
private final AtomicInteger numEntries = new AtomicInteger();
@ -316,13 +321,11 @@ public abstract class IncrementalIndex<AggregatorType> implements Iterable<Row>,
DimensionsSpec dimensionsSpec = incrementalIndexSchema.getDimensionsSpec();
this.dimensionDescs = Maps.newLinkedHashMap();
this.dimValues = Collections.synchronizedList(Lists.<DimDim>newArrayList());
for (String dimension : dimensionsSpec.getDimensions()) {
ColumnCapabilitiesImpl capabilities = new ColumnCapabilitiesImpl();
capabilities.setType(ValueType.STRING);
dimensionDescs.put(
dimension,
new DimensionDesc(dimensionDescs.size(), dimension, newDimDim(dimension), capabilities)
);
addNewDimension(dimension, capabilities);
columnCapabilities.put(dimension, capabilities);
}
@ -341,7 +344,7 @@ public abstract class IncrementalIndex<AggregatorType> implements Iterable<Row>,
private DimDim newDimDim(String dimension)
{
return new NullValueConverterDimDim(makeDimDim(dimension));
return new NullValueConverterDimDim(makeDimDim(dimension, dimensionDescs));
}
public abstract ConcurrentNavigableMap<TimeAndDims, Integer> getFacts();
@ -351,7 +354,7 @@ public abstract class IncrementalIndex<AggregatorType> implements Iterable<Row>,
public abstract String getOutOfRowsReason();
// use newDimDim
protected abstract DimDim makeDimDim(String dimension);
protected abstract DimDim makeDimDim(String dimension, Object lock);
protected abstract AggregatorType[] initAggs(
AggregatorFactory[] metrics,
@ -383,7 +386,7 @@ public abstract class IncrementalIndex<AggregatorType> implements Iterable<Row>,
@Override
public void close()
{
// Nothing to close
dimValues.clear();
}
public InputRow formatRow(InputRow row)
@ -410,7 +413,15 @@ public abstract class IncrementalIndex<AggregatorType> implements Iterable<Row>,
*
* @return the number of rows in the data set after adding the InputRow
*/
public int add(InputRow row) throws IndexSizeExceededException
public int add(InputRow row) throws IndexSizeExceededException {
TimeAndDims key = toTimeAndDims(row);
final int rv = addToFacts(metrics, deserializeComplexMetrics, row, numEntries, key, in, rowSupplier);
updateMaxIngestedTime(row.getTimestamp());
return rv;
}
@VisibleForTesting
TimeAndDims toTimeAndDims(InputRow row) throws IndexSizeExceededException
{
row = formatRow(row);
if (row.getTimestampFromEpoch() < minTimestamp) {
@ -419,10 +430,10 @@ public abstract class IncrementalIndex<AggregatorType> implements Iterable<Row>,
final List<String> rowDimensions = row.getDimensions();
String[][] dims;
List<String[]> overflow = null;
int[][] dims;
List<int[]> overflow = null;
synchronized (dimensionDescs) {
dims = new String[dimensionDescs.size()][];
dims = new int[dimensionDescs.size()][];
for (String dimension : rowDimensions) {
List<String> dimensionValues = row.getDimension(dimension);
@ -445,8 +456,7 @@ public abstract class IncrementalIndex<AggregatorType> implements Iterable<Row>,
}
if (desc == null) {
desc = new DimensionDesc(dimensionDescs.size(), dimension, newDimDim(dimension), capabilities);
dimensionDescs.put(dimension, desc);
desc = addNewDimension(dimension, capabilities);
if (overflow == null) {
overflow = Lists.newArrayList();
@ -471,7 +481,7 @@ public abstract class IncrementalIndex<AggregatorType> implements Iterable<Row>,
if (overflow != null) {
// Merge overflow and non-overflow
String[][] newDims = new String[dims.length + overflow.size()][];
int[][] newDims = new int[dims.length + overflow.size()][];
System.arraycopy(dims, 0, newDims, 0, dims.length);
for (int i = 0; i < overflow.size(); ++i) {
newDims[dims.length + i] = overflow.get(i);
@ -479,13 +489,11 @@ public abstract class IncrementalIndex<AggregatorType> implements Iterable<Row>,
dims = newDims;
}
final TimeAndDims key = new TimeAndDims(Math.max(gran.truncate(row.getTimestampFromEpoch()), minTimestamp), dims);
final Integer rv = addToFacts(metrics, deserializeComplexMetrics, row, numEntries, key, in, rowSupplier);
updateMaxIngestedTime(row.getTimestamp());
return rv;
long truncated = gran.truncate(row.getTimestampFromEpoch());
return new TimeAndDims(Math.max(truncated, minTimestamp), dims);
}
public synchronized void updateMaxIngestedTime(DateTime eventTime)
private synchronized void updateMaxIngestedTime(DateTime eventTime)
{
if (maxIngestedEventTime == null || maxIngestedEventTime.isBefore(eventTime)) {
maxIngestedEventTime = eventTime;
@ -512,26 +520,26 @@ public abstract class IncrementalIndex<AggregatorType> implements Iterable<Row>,
return getFacts().lastKey().getTimestamp();
}
private String[] getDimVals(final DimDim dimLookup, final List<String> dimValues)
private int[] getDimVals(final DimDim dimLookup, final List<String> dimValues)
{
final String[] retVal = new String[dimValues.size()];
if (dimValues.size() == 0) {
// NULL VALUE
if (!dimLookup.contains(null)) {
dimLookup.add(null);
}
dimLookup.add(null);
return null;
}
int count = 0;
for (String dimValue : dimValues) {
String canonicalDimValue = dimLookup.get(dimValue);
if (!dimLookup.contains(canonicalDimValue)) {
dimLookup.add(dimValue);
}
retVal[count] = canonicalDimValue;
count++;
if (dimValues.size() == 1) {
return new int[]{dimLookup.add(dimValues.get(0))};
}
String[] dimArray = dimValues.toArray(new String[dimValues.size()]);
Arrays.sort(dimArray);
final int[] retVal = new int[dimArray.length];
for (int i = 0; i < dimArray.length; i++) {
retVal[i] = dimLookup.add(dimArray[i]);
}
Arrays.sort(retVal);
return retVal;
}
@ -594,12 +602,6 @@ public abstract class IncrementalIndex<AggregatorType> implements Iterable<Row>,
return dimSpec == null ? null : dimSpec.getValues();
}
public Integer getDimensionIndex(String dimension)
{
DimensionDesc dimSpec = getDimension(dimension);
return dimSpec == null ? null : dimSpec.getIndex();
}
public List<String> getDimensionOrder()
{
synchronized (dimensionDescs) {
@ -609,7 +611,7 @@ public abstract class IncrementalIndex<AggregatorType> implements Iterable<Row>,
/*
* Currently called to initialize IncrementalIndex dimension order during index creation
* Index dimension ordering could be changed to initalize from DimensionsSpec after resolution of
* Index dimension ordering could be changed to initialize from DimensionsSpec after resolution of
* https://github.com/druid-io/druid/issues/2011
*/
public void loadDimensionIterable(Iterable<String> oldDimensionOrder)
@ -623,13 +625,25 @@ public abstract class IncrementalIndex<AggregatorType> implements Iterable<Row>,
ColumnCapabilitiesImpl capabilities = new ColumnCapabilitiesImpl();
capabilities.setType(ValueType.STRING);
columnCapabilities.put(dim, capabilities);
DimensionDesc desc = new DimensionDesc(dimensionDescs.size(), dim, newDimDim(dim), capabilities);
dimensionDescs.put(dim, desc);
addNewDimension(dim, capabilities);
}
}
}
}
@GuardedBy("dimensionDescs")
private DimensionDesc addNewDimension(String dim, ColumnCapabilitiesImpl capabilities)
{
DimensionDesc desc = new DimensionDesc(dimensionDescs.size(), dim, newDimDim(dim), capabilities);
if (dimValues.size() != desc.getIndex()) {
throw new ISE("dimensionDescs and dimValues for [%s] is out of sync!!", dim);
}
dimensionDescs.put(dim, desc);
dimValues.add(desc.getValues());
return desc;
}
public List<String> getMetricNames()
{
return ImmutableList.copyOf(metricDescs.keySet());
@ -678,13 +692,13 @@ public abstract class IncrementalIndex<AggregatorType> implements Iterable<Row>,
public Iterable<Row> iterableWithPostAggregations(final List<PostAggregator> postAggs, final boolean descending)
{
final List<String> dimensions = getDimensionNames();
final ConcurrentNavigableMap<TimeAndDims, Integer> facts = descending ? getFacts().descendingMap() : getFacts();
return new Iterable<Row>()
{
@Override
public Iterator<Row> iterator()
{
final List<DimensionDesc> dimensions = getDimensions();
final ConcurrentNavigableMap<TimeAndDims, Integer> facts = descending ? getFacts().descendingMap() : getFacts();
return Iterators.transform(
facts.entrySet().iterator(),
new Function<Map.Entry<TimeAndDims, Integer>, Row>()
@ -695,15 +709,28 @@ public abstract class IncrementalIndex<AggregatorType> implements Iterable<Row>,
final TimeAndDims timeAndDims = input.getKey();
final int rowOffset = input.getValue();
String[][] theDims = timeAndDims.getDims();
int[][] theDims = timeAndDims.getDims();
Map<String, Object> theVals = Maps.newLinkedHashMap();
for (int i = 0; i < theDims.length; ++i) {
String[] dim = theDims[i];
if (dim != null && dim.length != 0) {
theVals.put(dimensions.get(i), dim.length == 1 ? dim[0] : Arrays.asList(dim));
int[] dim = theDims[i];
DimensionDesc dimensionDesc = dimensions.get(i);
if (dimensionDesc == null) {
continue;
}
String dimensionName = dimensionDesc.getName();
if (dim == null || dim.length == 0) {
theVals.put(dimensionName, null);
continue;
}
if (dim.length == 1) {
theVals.put(dimensionName, Strings.nullToEmpty(dimensionDesc.getValues().getValue(dim[0])));
} else {
theVals.put(dimensions.get(i), null);
String[] dimStringValue = new String[dim.length];
for (int j = 0; j < dimStringValue.length; j++) {
dimStringValue[j] = Strings.nullToEmpty(dimensionDesc.getValues().getValue(dim[j]));
}
theVals.put(dimensionName, dimStringValue);
}
}
@ -731,7 +758,7 @@ public abstract class IncrementalIndex<AggregatorType> implements Iterable<Row>,
return maxIngestedEventTime;
}
public static class DimensionDesc
public static final class DimensionDesc
{
private final int index;
private final String name;
@ -767,7 +794,7 @@ public abstract class IncrementalIndex<AggregatorType> implements Iterable<Row>,
}
}
public static class MetricDesc
public static final class MetricDesc
{
private final int index;
private final String name;
@ -812,8 +839,6 @@ public abstract class IncrementalIndex<AggregatorType> implements Iterable<Row>,
static interface DimDim
{
public String get(String value);
public int getId(String value);
public String getValue(int id);
@ -824,13 +849,18 @@ public abstract class IncrementalIndex<AggregatorType> implements Iterable<Row>,
public int add(String value);
public int getSortedId(String value);
public SortedDimLookup sort();
}
public String getSortedValue(int index);
static interface SortedDimLookup
{
public int size();
public void sort();
public int idToIndex(int id);
public boolean compareCanonicalValues(String s1, String s2);
public int indexToId(int index);
public String getValue(int index);
}
/**
@ -845,12 +875,6 @@ public abstract class IncrementalIndex<AggregatorType> implements Iterable<Row>,
this.delegate = delegate;
}
@Override
public String get(String value)
{
return delegate.get(Strings.nullToEmpty(value));
}
@Override
public int getId(String value)
{
@ -882,38 +906,54 @@ public abstract class IncrementalIndex<AggregatorType> implements Iterable<Row>,
}
@Override
public int getSortedId(String value)
public SortedDimLookup sort()
{
return delegate.getSortedId(Strings.nullToEmpty(value));
}
@Override
public String getSortedValue(int index)
{
return Strings.emptyToNull(delegate.getSortedValue(index));
}
@Override
public void sort()
{
delegate.sort();
}
@Override
public boolean compareCanonicalValues(String s1, String s2)
{
return delegate.compareCanonicalValues(Strings.nullToEmpty(s1), Strings.nullToEmpty(s2));
return new NullValueConverterDimLookup(delegate.sort());
}
}
static class TimeAndDims implements Comparable<TimeAndDims>
private static class NullValueConverterDimLookup implements SortedDimLookup
{
private final SortedDimLookup delegate;
public NullValueConverterDimLookup(SortedDimLookup delegate)
{
this.delegate = delegate;
}
@Override
public int size()
{
return delegate.size();
}
@Override
public int indexToId(int index)
{
return delegate.indexToId(index);
}
@Override
public int idToIndex(int id)
{
return delegate.idToIndex(id);
}
@Override
public String getValue(int index)
{
return Strings.emptyToNull(delegate.getValue(index));
}
}
static final class TimeAndDims
{
private final long timestamp;
private final String[][] dims;
private final int[][] dims;
TimeAndDims(
long timestamp,
String[][] dims
int[][] dims
)
{
this.timestamp = timestamp;
@ -925,61 +965,21 @@ public abstract class IncrementalIndex<AggregatorType> implements Iterable<Row>,
return timestamp;
}
String[][] getDims()
int[][] getDims()
{
return dims;
}
@Override
public int compareTo(TimeAndDims rhs)
{
int retVal = Longs.compare(timestamp, rhs.timestamp);
int numComparisons = Math.min(dims.length, rhs.dims.length);
int index = 0;
while (retVal == 0 && index < numComparisons) {
String[] lhsVals = dims[index];
String[] rhsVals = rhs.dims[index];
if (lhsVals == null) {
if (rhsVals == null) {
++index;
continue;
}
return -1;
}
if (rhsVals == null) {
return 1;
}
retVal = Ints.compare(lhsVals.length, rhsVals.length);
int valsIndex = 0;
while (retVal == 0 && valsIndex < lhsVals.length) {
retVal = lhsVals[valsIndex].compareTo(rhsVals[valsIndex]);
++valsIndex;
}
++index;
}
if (retVal == 0) {
return Ints.compare(dims.length, rhs.dims.length);
}
return retVal;
}
@Override
public String toString()
{
return "TimeAndDims{" +
"timestamp=" + new DateTime(timestamp) +
", dims=" + Lists.transform(
Arrays.asList(dims), new Function<String[], Object>()
Arrays.asList(dims), new Function<int[], Object>()
{
@Override
public Object apply(@Nullable String[] input)
public Object apply(@Nullable int[] input)
{
if (input == null || input.length == 0) {
return Arrays.asList("null");
@ -990,4 +990,69 @@ public abstract class IncrementalIndex<AggregatorType> implements Iterable<Row>,
) + '}';
}
}
protected final Comparator<TimeAndDims> dimsComparator()
{
return new TimeAndDimsComp(dimValues);
}
@VisibleForTesting
static final class TimeAndDimsComp implements Comparator<TimeAndDims>
{
private final List<DimDim> dimValues;
public TimeAndDimsComp(List<DimDim> dimValues)
{
this.dimValues = dimValues;
}
@Override
public int compare(TimeAndDims lhs, TimeAndDims rhs)
{
int retVal = Longs.compare(lhs.timestamp, rhs.timestamp);
int numComparisons = Math.min(lhs.dims.length, rhs.dims.length);
int index = 0;
while (retVal == 0 && index < numComparisons) {
final int[] lhsIdxs = lhs.dims[index];
final int[] rhsIdxs = rhs.dims[index];
if (lhsIdxs == null) {
if (rhsIdxs == null) {
++index;
continue;
}
return -1;
}
if (rhsIdxs == null) {
return 1;
}
retVal = Ints.compare(lhsIdxs.length, rhsIdxs.length);
int valsIndex = 0;
while (retVal == 0 && valsIndex < lhsIdxs.length) {
if (lhsIdxs[valsIndex] != rhsIdxs[valsIndex]) {
final DimDim dimLookup = dimValues.get(index);
final String lhsVal = dimLookup.getValue(lhsIdxs[valsIndex]);
final String rhsVal = dimLookup.getValue(rhsIdxs[valsIndex]);
if (lhsVal != null && rhsVal != null) {
retVal = lhsVal.compareTo(rhsVal);
} else if (lhsVal == null ^ rhsVal == null) {
retVal = lhsVal == null ? -1 : 1;
}
}
++valsIndex;
}
++index;
}
if (retVal == 0) {
return Ints.compare(lhs.dims.length, rhs.dims.length);
}
return retVal;
}
}
}

View File

@ -39,7 +39,6 @@ import io.druid.segment.data.ListIndexed;
import org.joda.time.Interval;
import org.roaringbitmap.IntIterator;
import javax.annotation.Nullable;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
@ -53,9 +52,40 @@ public class IncrementalIndexAdapter implements IndexableAdapter
private static final Logger log = new Logger(IncrementalIndexAdapter.class);
private final Interval dataInterval;
private final IncrementalIndex<?> index;
private final Map<String, Map<String, MutableBitmap>> invertedIndexes;
private final Set<String> hasNullValueDimensions;
private final Metadata metadata;
private final Map<String, DimensionIndexer> indexers;
private class DimensionIndexer
{
private final IncrementalIndex.DimensionDesc dimensionDesc;
private final MutableBitmap[] invertedIndexes;
private IncrementalIndex.SortedDimLookup dimLookup;
public DimensionIndexer(IncrementalIndex.DimensionDesc dimensionDesc)
{
this.dimensionDesc = dimensionDesc;
this.invertedIndexes = new MutableBitmap[dimensionDesc.getValues().size() + 1];
}
private IncrementalIndex.DimDim getDimValues()
{
return dimensionDesc.getValues();
}
private IncrementalIndex.SortedDimLookup getDimLookup()
{
if (dimLookup == null) {
final IncrementalIndex.DimDim dimDim = dimensionDesc.getValues();
if (hasNullValueDimensions.contains(dimensionDesc.getName()) && !dimDim.contains(null)) {
dimDim.add(null);
}
dimLookup = dimDim.sort();
}
return dimLookup;
}
}
public IncrementalIndexAdapter(
Interval dataInterval, IncrementalIndex<?> index, BitmapFactory bitmapFactory
@ -63,9 +93,7 @@ public class IncrementalIndexAdapter implements IndexableAdapter
{
this.dataInterval = dataInterval;
this.index = index;
this.metadata = index.getMetadata();
this.invertedIndexes = Maps.newHashMap();
/* Sometimes it's hard to tell whether one dimension contains a null value or not.
* If one dimension had show a null or empty value explicitly, then yes, it contains
* null value. But if one dimension's values are all non-null, it still early to say
@ -79,40 +107,35 @@ public class IncrementalIndexAdapter implements IndexableAdapter
final List<IncrementalIndex.DimensionDesc> dimensions = index.getDimensions();
indexers = Maps.newHashMapWithExpectedSize(dimensions.size());
for (IncrementalIndex.DimensionDesc dimension : dimensions) {
invertedIndexes.put(dimension.getName(), Maps.<String, MutableBitmap>newHashMap());
indexers.put(dimension.getName(), new DimensionIndexer(dimension));
}
int rowNum = 0;
for (IncrementalIndex.TimeAndDims timeAndDims : index.getFacts().keySet()) {
final String[][] dims = timeAndDims.getDims();
final int[][] dims = timeAndDims.getDims();
for (IncrementalIndex.DimensionDesc dimension : dimensions) {
final int dimIndex = dimension.getIndex();
final Map<String, MutableBitmap> bitmapIndexes = invertedIndexes.get(dimension.getName());
if (bitmapIndexes == null || dims == null) {
log.error("bitmapIndexes and dims are null!");
continue;
}
DimensionIndexer indexer = indexers.get(dimension.getName());
if (dimIndex >= dims.length || dims[dimIndex] == null) {
hasNullValueDimensions.add(dimension.getName());
continue;
}
if (hasNullValue(dims[dimIndex])) {
final IncrementalIndex.DimDim values = dimension.getValues();
if (hasNullValue(values, dims[dimIndex])) {
hasNullValueDimensions.add(dimension.getName());
}
for (String dimValue : dims[dimIndex]) {
MutableBitmap mutableBitmap = bitmapIndexes.get(dimValue);
final MutableBitmap[] bitmapIndexes = indexer.invertedIndexes;
if (mutableBitmap == null) {
mutableBitmap = bitmapFactory.makeEmptyMutableBitmap();
bitmapIndexes.put(dimValue, mutableBitmap);
for (int dimIdx : dims[dimIndex]) {
if (bitmapIndexes[dimIdx] == null) {
bitmapIndexes[dimIdx] = bitmapFactory.makeEmptyMutableBitmap();
}
try {
mutableBitmap.add(rowNum);
bitmapIndexes[dimIdx].add(rowNum);
}
catch (Exception e) {
log.info(e.toString());
@ -151,51 +174,46 @@ public class IncrementalIndexAdapter implements IndexableAdapter
@Override
public Indexed<String> getDimValueLookup(String dimension)
{
final IncrementalIndex.DimDim dimDim = index.getDimensionValues(dimension);
if (dimDim != null) {
if (hasNullValueDimensions.contains(dimension)
&& !dimDim.contains(null))
{
dimDim.add(null);
}
dimDim.sort();
return new Indexed<String>()
{
@Override
public Class<? extends String> getClazz()
{
return String.class;
}
@Override
public int size()
{
return dimDim.size();
}
@Override
public String get(int index)
{
return dimDim.getSortedValue(index);
}
@Override
public int indexOf(String value)
{
return dimDim.getSortedId(value);
}
@Override
public Iterator<String> iterator()
{
return IndexedIterable.create(this).iterator();
}
};
} else {
final DimensionIndexer indexer = indexers.get(dimension);
if (indexer == null) {
return null;
}
final IncrementalIndex.DimDim dimDim = indexer.getDimValues();
final IncrementalIndex.SortedDimLookup dimLookup = indexer.getDimLookup();
return new Indexed<String>()
{
@Override
public Class<? extends String> getClazz()
{
return String.class;
}
@Override
public int size()
{
return dimLookup.size();
}
@Override
public String get(int index)
{
return dimLookup.getValue(index);
}
@Override
public int indexOf(String value)
{
int id = dimDim.getId(value);
return id < 0 ? -1 : dimLookup.idToIndex(id);
}
@Override
public Iterator<String> iterator()
{
return IndexedIterable.create(this).iterator();
}
};
}
@Override
@ -207,31 +225,32 @@ public class IncrementalIndexAdapter implements IndexableAdapter
public Iterator<Rowboat> iterator()
{
final List<IncrementalIndex.DimensionDesc> dimensions = index.getDimensions();
final IncrementalIndex.SortedDimLookup[] dimLookups = new IncrementalIndex.SortedDimLookup[dimensions.size()];
for (IncrementalIndex.DimensionDesc dimension : dimensions) {
dimLookups[dimension.getIndex()] = indexers.get(dimension.getName()).getDimLookup();
}
/*
* Note that the transform function increments a counter to determine the rowNum of
* the iterated Rowboats. We need to return a new iterator on each
* iterator() call to ensure the counter starts at 0.
*/
return (Iterators.transform(
return Iterators.transform(
index.getFacts().entrySet().iterator(),
new Function<Map.Entry<IncrementalIndex.TimeAndDims, Integer>, Rowboat>()
{
int count = 0;
@Override
public Rowboat apply(
@Nullable Map.Entry<IncrementalIndex.TimeAndDims, Integer> input
)
public Rowboat apply(Map.Entry<IncrementalIndex.TimeAndDims, Integer> input)
{
final IncrementalIndex.TimeAndDims timeAndDims = input.getKey();
final String[][] dimValues = timeAndDims.getDims();
final int[][] dimValues = timeAndDims.getDims();
final int rowOffset = input.getValue();
int[][] dims = new int[dimValues.length][];
for (IncrementalIndex.DimensionDesc dimension : dimensions) {
final int dimIndex = dimension.getIndex();
final IncrementalIndex.DimDim dimDim = dimension.getValues();
dimDim.sort();
if (dimIndex >= dimValues.length || dimValues[dimIndex] == null) {
continue;
@ -244,7 +263,7 @@ public class IncrementalIndexAdapter implements IndexableAdapter
}
for (int i = 0; i < dimValues[dimIndex].length; ++i) {
dims[dimIndex][i] = dimDim.getSortedId(dimValues[dimIndex][i]);
dims[dimIndex][i] = dimLookups[dimIndex].idToIndex(dimValues[dimIndex][i]);
}
}
@ -261,21 +280,26 @@ public class IncrementalIndexAdapter implements IndexableAdapter
);
}
}
));
);
}
};
}
@Override
public IndexedInts getBitmapIndex(String dimension, String value)
public IndexedInts getBitmapIndex(String dimension, int index)
{
Map<String, MutableBitmap> dimInverted = invertedIndexes.get(dimension);
if (dimInverted == null) {
DimensionIndexer accessor = indexers.get(dimension);
if (accessor == null) {
return EmptyIndexedInts.EMPTY_INDEXED_INTS;
}
final MutableBitmap bitmapIndex = dimInverted.get(value);
IncrementalIndex.SortedDimLookup dimLookup = accessor.getDimLookup();
final int id = dimLookup.indexToId(index);
if (id < 0 || id >= dimLookup.size()) {
return EmptyIndexedInts.EMPTY_INDEXED_INTS;
}
MutableBitmap bitmapIndex = accessor.invertedIndexes[id];
if (bitmapIndex == null) {
return EmptyIndexedInts.EMPTY_INDEXED_INTS;
@ -296,27 +320,13 @@ public class IncrementalIndexAdapter implements IndexableAdapter
return index.getCapabilities(column);
}
@Override
public IndexedInts getBitmapIndex(String dimension, int dictId)
private boolean hasNullValue(IncrementalIndex.DimDim dimDim, int[] dimIndices)
{
if (dictId >= 0) {
final Indexed<String> dimValues = getDimValueLookup(dimension);
//NullValueConverterDimDim will convert empty to null, we need convert it back to the actual values,
//because getBitmapIndex relies on the actual values stored in DimDim.
String value = Strings.nullToEmpty(dimValues.get(dictId));
return getBitmapIndex(dimension, value);
} else {
return EmptyIndexedInts.EMPTY_INDEXED_INTS;
}
}
private boolean hasNullValue(String[] dimValues)
{
if (dimValues == null || dimValues.length == 0) {
if (dimIndices == null || dimIndices.length == 0) {
return true;
}
for (String dimVal : dimValues) {
if (Strings.isNullOrEmpty(dimVal)) {
for (int dimIndex : dimIndices) {
if (Strings.isNullOrEmpty(dimDim.getValue(dimIndex))) {
return true;
}
}
@ -382,13 +392,12 @@ public class IncrementalIndexAdapter implements IndexableAdapter
@Override
public void close() throws IOException
{
}
}
@Override
public Metadata getMetadata()
{
return metadata;
return index.getMetadata();
}
}

View File

@ -26,6 +26,7 @@ import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.primitives.Ints;
import com.metamx.collections.spatial.search.Bound;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
@ -59,7 +60,6 @@ import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@ -220,10 +220,10 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter
{
cursorMap = index.getSubMap(
new IncrementalIndex.TimeAndDims(
timeStart, new String[][]{}
timeStart, new int[][]{}
),
new IncrementalIndex.TimeAndDims(
Math.min(actualInterval.getEndMillis(), gran.next(input)), new String[][]{}
Math.min(actualInterval.getEndMillis(), gran.next(input)), new int[][]{}
)
);
if (descending) {
@ -329,57 +329,52 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter
return new SingleScanTimeDimSelector(makeLongColumnSelector(dimension), extractionFn, descending);
}
final IncrementalIndex.DimDim dimValLookup = index.getDimensionValues(dimension);
if (dimValLookup == null) {
final IncrementalIndex.DimensionDesc dimensionDesc = index.getDimension(dimension);
if (dimensionDesc == null) {
return NULL_DIMENSION_SELECTOR;
}
final int dimIndex = dimensionDesc.getIndex();
final IncrementalIndex.DimDim dimValLookup = dimensionDesc.getValues();
final int maxId = dimValLookup.size();
final int dimIndex = index.getDimensionIndex(dimension);
return new DimensionSelector()
{
@Override
public IndexedInts getRow()
{
final ArrayList<Integer> vals = Lists.newArrayList();
if (dimIndex < currEntry.getKey().getDims().length) {
final String[] dimVals = currEntry.getKey().getDims()[dimIndex];
if (dimVals != null) {
for (String dimVal : dimVals) {
int id = dimValLookup.getId(dimVal);
if (id < maxId) {
vals.add(id);
}
}
}
final int[][] dims = currEntry.getKey().getDims();
int[] indices = dimIndex < dims.length ? dims[dimIndex] : null;
if (indices == null) {
indices = new int[0];
}
// check for null entry
if (vals.isEmpty() && dimValLookup.contains(null)) {
int id = dimValLookup.getId(null);
if (id < maxId) {
vals.add(id);
}
if (indices.length == 0 && dimValLookup.contains(null)) {
indices = new int[] { dimValLookup.getId(null) };
}
final int[] vals = indices;
return new IndexedInts()
{
@Override
public int size()
{
return vals.size();
return vals.length;
}
@Override
public int get(int index)
{
return vals.get(index);
return vals[index];
}
@Override
public Iterator<Integer> iterator()
{
return vals.iterator();
return Ints.asList(vals).iterator();
}
@Override
@ -533,10 +528,13 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter
};
}
final Integer dimensionIndexInt = index.getDimensionIndex(column);
IncrementalIndex.DimensionDesc dimensionDesc = index.getDimension(column);
if (dimensionDesc != null) {
final int dimensionIndex = dimensionDesc.getIndex();
final IncrementalIndex.DimDim dimDim = dimensionDesc.getValues();
if (dimensionIndexInt != null) {
final int dimensionIndex = dimensionIndexInt;
return new ObjectColumnSelector<Object>()
{
@Override
@ -553,17 +551,21 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter
return null;
}
String[][] dims = key.getDims();
int[][] dims = key.getDims();
if (dimensionIndex >= dims.length) {
return null;
}
final String[] dimVals = dims[dimensionIndex];
if (dimVals == null || dimVals.length == 0) {
final int[] dimIdx = dims[dimensionIndex];
if (dimIdx == null || dimIdx.length == 0) {
return null;
}
if (dimVals.length == 1) {
return dimVals[0];
if (dimIdx.length == 1) {
return dimDim.getValue(dimIdx[0]);
}
String[] dimVals = new String[dimIdx.length];
for (int i = 0; i < dimIdx.length; i++) {
dimVals[i] = dimDim.getValue(dimIdx[i]);
}
return dimVals;
}
@ -624,21 +626,22 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter
@Override
public ValueMatcher makeValueMatcher(String dimension, final String value)
{
Integer dimIndexObject = index.getDimensionIndex(dimension);
if (dimIndexObject == null) {
IncrementalIndex.DimensionDesc dimensionDesc = index.getDimension(dimension);
if (dimensionDesc == null) {
return new BooleanValueMatcher(Strings.isNullOrEmpty(value));
}
final IncrementalIndex.DimDim dimDim = index.getDimensionValues(dimension);
if (!dimDim.contains(value)) {
if (Strings.isNullOrEmpty(value)) {
final int dimIndex = dimIndexObject;
final int dimIndex = dimensionDesc.getIndex();
final IncrementalIndex.DimDim dimDim = dimensionDesc.getValues();
final Integer id = dimDim.getId(value);
if (id == null) {
if (Strings.isNullOrEmpty(value)) {
return new ValueMatcher()
{
@Override
public boolean matches()
{
String[][] dims = holder.getKey().getDims();
int[][] dims = holder.getKey().getDims();
if (dimIndex >= dims.length || dims[dimIndex] == null) {
return true;
}
@ -649,25 +652,17 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter
return new BooleanValueMatcher(false);
}
final int dimIndex = dimIndexObject;
final String id = dimDim.get(value);
return new ValueMatcher()
{
@Override
public boolean matches()
{
String[][] dims = holder.getKey().getDims();
int[][] dims = holder.getKey().getDims();
if (dimIndex >= dims.length || dims[dimIndex] == null) {
return Strings.isNullOrEmpty(value);
}
for (String dimVal : dims[dimIndex]) {
if (dimDim.compareCanonicalValues(id, dimVal)) {
return true;
}
}
return false;
return Ints.indexOf(dims[dimIndex], id) >= 0;
}
};
}
@ -675,24 +670,25 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter
@Override
public ValueMatcher makeValueMatcher(String dimension, final Predicate<String> predicate)
{
Integer dimIndexObject = index.getDimensionIndex(dimension);
if (dimIndexObject == null) {
IncrementalIndex.DimensionDesc dimensionDesc = index.getDimension(dimension);
if (dimensionDesc == null) {
return new BooleanValueMatcher(false);
}
final int dimIndex = dimIndexObject;
final int dimIndex = dimensionDesc.getIndex();
final IncrementalIndex.DimDim dimDim = dimensionDesc.getValues();
return new ValueMatcher()
{
@Override
public boolean matches()
{
String[][] dims = holder.getKey().getDims();
int[][] dims = holder.getKey().getDims();
if (dimIndex >= dims.length || dims[dimIndex] == null) {
return predicate.apply(null);
}
for (String dimVal : dims[dimIndex]) {
if (predicate.apply(dimVal)) {
for (int dimVal : dims[dimIndex]) {
if (predicate.apply(dimDim.getValue(dimVal))) {
return true;
}
}
@ -704,24 +700,25 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter
@Override
public ValueMatcher makeValueMatcher(final String dimension, final Bound bound)
{
Integer dimIndexObject = index.getDimensionIndex(dimension);
if (dimIndexObject == null) {
IncrementalIndex.DimensionDesc dimensionDesc = index.getDimension(dimension);
if (dimensionDesc == null) {
return new BooleanValueMatcher(false);
}
final int dimIndex = dimIndexObject;
final int dimIndex = dimensionDesc.getIndex();
final IncrementalIndex.DimDim dimDim = dimensionDesc.getValues();
return new ValueMatcher()
{
@Override
public boolean matches()
{
String[][] dims = holder.getKey().getDims();
int[][] dims = holder.getKey().getDims();
if (dimIndex >= dims.length || dims[dimIndex] == null) {
return false;
}
for (String dimVal : dims[dimIndex]) {
List<String> stringCoords = Lists.newArrayList(SPLITTER.split(dimVal));
for (int dimVal : dims[dimIndex]) {
List<String> stringCoords = Lists.newArrayList(SPLITTER.split(dimDim.getValue(dimVal)));
float[] coords = new float[stringCoords.size()];
for (int j = 0; j < coords.length; j++) {
coords[j] = Float.valueOf(stringCoords.get(j));

View File

@ -1,486 +0,0 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.segment.incremental;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.base.Throwables;
import com.metamx.common.ISE;
import io.druid.collections.ResourceHolder;
import io.druid.collections.StupidPool;
import io.druid.data.input.InputRow;
import io.druid.granularity.QueryGranularity;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.BufferAggregator;
import org.mapdb.BTreeKeySerializer;
import org.mapdb.DB;
import org.mapdb.DBMaker;
import org.mapdb.Serializer;
import org.mapdb.Store;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.io.Serializable;
import java.lang.ref.WeakReference;
import java.lang.reflect.Field;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Comparator;
import java.util.Map;
import java.util.UUID;
import java.util.WeakHashMap;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.atomic.AtomicInteger;
@Deprecated
/**
* This is not yet ready for production use and requires more work.
*/
public class OffheapIncrementalIndex extends IncrementalIndex<BufferAggregator>
{
private static final long STORE_CHUNK_SIZE;
static
{
// MapDB allocated memory in chunks. We need to know CHUNK_SIZE
// in order to get a crude estimate of how much more direct memory
// might be used when adding an additional row.
try {
Field field = Store.class.getDeclaredField("CHUNK_SIZE");
field.setAccessible(true);
STORE_CHUNK_SIZE = field.getLong(null);
} catch(NoSuchFieldException | IllegalAccessException e) {
throw new RuntimeException("Unable to determine MapDB store chunk size", e);
}
}
private final ResourceHolder<ByteBuffer> bufferHolder;
private final DB db;
private final DB factsDb;
private final int[] aggPositionOffsets;
private final int totalAggSize;
private final ConcurrentNavigableMap<TimeAndDims, Integer> facts;
private final int maxTotalBufferSize;
private String outOfRowsReason = null;
public OffheapIncrementalIndex(
IncrementalIndexSchema incrementalIndexSchema,
StupidPool<ByteBuffer> bufferPool,
boolean deserializeComplexMetrics,
int maxTotalBufferSize
)
{
super(incrementalIndexSchema, deserializeComplexMetrics);
this.bufferHolder = bufferPool.take();
Preconditions.checkArgument(
maxTotalBufferSize > bufferHolder.get().limit(),
"Maximum total buffer size must be greater than aggregation buffer size"
);
final AggregatorFactory[] metrics = incrementalIndexSchema.getMetrics();
this.aggPositionOffsets = new int[metrics.length];
int currAggSize = 0;
for (int i = 0; i < metrics.length; i++) {
final AggregatorFactory agg = metrics[i];
aggPositionOffsets[i] = currAggSize;
currAggSize += agg.getMaxIntermediateSize();
}
this.totalAggSize = currAggSize;
final DBMaker dbMaker = DBMaker.newMemoryDirectDB()
.transactionDisable()
.asyncWriteEnable()
.cacheLRUEnable()
.cacheSize(16384);
this.factsDb = dbMaker.make();
this.db = dbMaker.make();
final TimeAndDimsSerializer timeAndDimsSerializer = new TimeAndDimsSerializer(this);
this.facts = factsDb.createTreeMap("__facts" + UUID.randomUUID())
.keySerializer(timeAndDimsSerializer)
.comparator(timeAndDimsSerializer.getComparator())
.valueSerializer(Serializer.INTEGER)
.make();
this.maxTotalBufferSize = maxTotalBufferSize;
}
public OffheapIncrementalIndex(
long minTimestamp,
QueryGranularity gran,
final AggregatorFactory[] metrics,
StupidPool<ByteBuffer> bufferPool,
boolean deserializeComplexMetrics,
int maxTotalBufferSize
)
{
this(
new IncrementalIndexSchema.Builder().withMinTimestamp(minTimestamp)
.withQueryGranularity(gran)
.withMetrics(metrics)
.build(),
bufferPool,
deserializeComplexMetrics,
maxTotalBufferSize
);
}
@Override
public ConcurrentNavigableMap<TimeAndDims, Integer> getFacts()
{
return facts;
}
@Override
protected DimDim makeDimDim(String dimension)
{
return new OffheapDimDim(dimension);
}
@Override
protected BufferAggregator[] initAggs(
AggregatorFactory[] metrics,
Supplier<InputRow> rowSupplier,
boolean deserializeComplexMetrics
)
{
BufferAggregator[] aggs = new BufferAggregator[metrics.length];
for (int i = 0; i < metrics.length; i++) {
final AggregatorFactory agg = metrics[i];
aggs[i] = agg.factorizeBuffered(
makeColumnSelectorFactory(agg, rowSupplier, deserializeComplexMetrics)
);
}
return aggs;
}
@Override
protected Integer addToFacts(
AggregatorFactory[] metrics,
boolean deserializeComplexMetrics,
InputRow row,
AtomicInteger numEntries,
TimeAndDims key,
ThreadLocal<InputRow> rowContainer,
Supplier<InputRow> rowSupplier
) throws IndexSizeExceededException
{
final BufferAggregator[] aggs = getAggs();
Integer rowOffset;
synchronized (this) {
if (!facts.containsKey(key)) {
if (!canAppendRow(false)) {
throw new IndexSizeExceededException("%s", getOutOfRowsReason());
}
}
rowOffset = totalAggSize * numEntries.get();
final Integer prev = facts.putIfAbsent(key, rowOffset);
if (prev != null) {
rowOffset = prev;
} else {
numEntries.incrementAndGet();
for (int i = 0; i < aggs.length; i++) {
aggs[i].init(bufferHolder.get(), getMetricPosition(rowOffset, i));
}
}
}
rowContainer.set(row);
for (int i = 0; i < aggs.length; i++) {
synchronized (aggs[i]) {
aggs[i].aggregate(bufferHolder.get(), getMetricPosition(rowOffset, i));
}
}
rowContainer.set(null);
return numEntries.get();
}
public boolean canAppendRow() {
return canAppendRow(true);
}
private boolean canAppendRow(boolean includeFudgeFactor)
{
// there is a race condition when checking current MapDB
// when canAppendRow() is called after adding a row it may return true, but on a subsequence call
// to addToFacts that may not be the case anymore because MapDB size may have changed.
// so we add this fudge factor, hoping that will be enough.
final int aggBufferSize = bufferHolder.get().limit();
if ((size() + 1) * totalAggSize > aggBufferSize) {
outOfRowsReason = String.format("Maximum aggregation buffer limit reached [%d bytes].", aggBufferSize);
return false;
}
// hopefully both MapDBs will grow by at most STORE_CHUNK_SIZE each when we add the next row.
if (getCurrentSize() + totalAggSize + 2 * STORE_CHUNK_SIZE + (includeFudgeFactor ? STORE_CHUNK_SIZE : 0) > maxTotalBufferSize) {
outOfRowsReason = String.format("Maximum time and dimension buffer limit reached [%d bytes].", maxTotalBufferSize - aggBufferSize);
return false;
}
return true;
}
public String getOutOfRowsReason() {
return outOfRowsReason;
}
@Override
protected BufferAggregator[] getAggsForRow(int rowOffset)
{
return getAggs();
}
@Override
protected Object getAggVal(BufferAggregator agg, int rowOffset, int aggPosition)
{
return agg.get(bufferHolder.get(), getMetricPosition(rowOffset, aggPosition));
}
@Override
public float getMetricFloatValue(int rowOffset, int aggOffset)
{
return getAggs()[aggOffset].getFloat(bufferHolder.get(), getMetricPosition(rowOffset, aggOffset));
}
@Override
public long getMetricLongValue(int rowOffset, int aggOffset)
{
return getAggs()[aggOffset].getLong(bufferHolder.get(), getMetricPosition(rowOffset, aggOffset));
}
@Override
public Object getMetricObjectValue(int rowOffset, int aggOffset)
{
return getAggs()[aggOffset].get(bufferHolder.get(), getMetricPosition(rowOffset, aggOffset));
}
@Override
public void close()
{
try {
bufferHolder.close();
Store.forDB(db).close();
Store.forDB(factsDb).close();
}
catch (IOException e) {
throw Throwables.propagate(e);
}
}
private int getMetricPosition(int rowOffset, int metricIndex)
{
return rowOffset + aggPositionOffsets[metricIndex];
}
private DimDim getDimDim(int dimIndex)
{
return getDimensionValues(getDimensionNames().get(dimIndex));
}
// MapDB forces serializers to implement serializable, which sucks
private static class TimeAndDimsSerializer extends BTreeKeySerializer<TimeAndDims> implements Serializable
{
private final TimeAndDimsComparator comparator;
private final transient OffheapIncrementalIndex incrementalIndex;
TimeAndDimsSerializer(OffheapIncrementalIndex incrementalIndex)
{
this.comparator = new TimeAndDimsComparator();
this.incrementalIndex = incrementalIndex;
}
@Override
public void serialize(DataOutput out, int start, int end, Object[] keys) throws IOException
{
for (int i = start; i < end; i++) {
TimeAndDims timeAndDim = (TimeAndDims) keys[i];
out.writeLong(timeAndDim.getTimestamp());
out.writeInt(timeAndDim.getDims().length);
int index = 0;
for (String[] dims : timeAndDim.getDims()) {
if (dims == null) {
out.write(-1);
} else {
DimDim dimDim = incrementalIndex.getDimDim(index);
out.writeInt(dims.length);
for (String value : dims) {
out.writeInt(dimDim.getId(value));
}
}
index++;
}
}
}
@Override
public Object[] deserialize(DataInput in, int start, int end, int size) throws IOException
{
Object[] ret = new Object[size];
for (int i = start; i < end; i++) {
final long timeStamp = in.readLong();
final String[][] dims = new String[in.readInt()][];
for (int k = 0; k < dims.length; k++) {
int len = in.readInt();
if (len != -1) {
DimDim dimDim = incrementalIndex.getDimDim(k);
String[] col = new String[len];
for (int l = 0; l < col.length; l++) {
col[l] = dimDim.get(dimDim.getValue(in.readInt()));
}
dims[k] = col;
}
}
ret[i] = new TimeAndDims(timeStamp, dims);
}
return ret;
}
@Override
public Comparator<TimeAndDims> getComparator()
{
return comparator;
}
}
private static class TimeAndDimsComparator implements Comparator, Serializable
{
@Override
public int compare(Object o1, Object o2)
{
return ((TimeAndDims) o1).compareTo((TimeAndDims) o2);
}
}
private class OffheapDimDim implements DimDim
{
private final Map<String, Integer> falseIds;
private final Map<Integer, String> falseIdsReverse;
private final WeakHashMap<String, WeakReference<String>> cache = new WeakHashMap();
private volatile String[] sortedVals = null;
// size on MapDB is slow so maintain a count here
private volatile int size = 0;
public OffheapDimDim(String dimension)
{
falseIds = db.createHashMap(dimension)
.keySerializer(Serializer.STRING)
.valueSerializer(Serializer.INTEGER)
.make();
falseIdsReverse = db.createHashMap(dimension + "_inverse")
.keySerializer(Serializer.INTEGER)
.valueSerializer(Serializer.STRING)
.make();
}
/**
* Returns the interned String value to allow fast comparisons using `==` instead of `.equals()`
*
* @see io.druid.segment.incremental.IncrementalIndexStorageAdapter.EntryHolderValueMatcherFactory#makeValueMatcher(String, String)
*/
public String get(String str)
{
final WeakReference<String> cached = cache.get(str);
if (cached != null) {
final String value = cached.get();
if (value != null) {
return value;
}
}
cache.put(str, new WeakReference(str));
return str;
}
public int getId(String value)
{
return falseIds.get(value);
}
public String getValue(int id)
{
return falseIdsReverse.get(id);
}
public boolean contains(String value)
{
return falseIds.containsKey(value);
}
public int size()
{
return size;
}
public synchronized int add(String value)
{
int id = size++;
falseIds.put(value, id);
falseIdsReverse.put(id, value);
return id;
}
public int getSortedId(String value)
{
assertSorted();
return Arrays.binarySearch(sortedVals, value);
}
public String getSortedValue(int index)
{
assertSorted();
return sortedVals[index];
}
public void sort()
{
if (sortedVals == null) {
sortedVals = new String[falseIds.size()];
int index = 0;
for (String value : falseIds.keySet()) {
sortedVals[index++] = value;
}
Arrays.sort(sortedVals);
}
}
private void assertSorted()
{
if (sortedVals == null) {
throw new ISE("Call sort() before calling the getSorted* methods.");
}
}
public boolean compareCanonicalValues(String s1, String s2)
{
return s1.equals(s2);
}
}
private long getCurrentSize()
{
return Store.forDB(db).getCurrSize() +
Store.forDB(factsDb).getCurrSize()
// Size of aggregators
+ size() * totalAggSize;
}
}

View File

@ -20,10 +20,8 @@
package io.druid.segment.incremental;
import com.google.common.base.Supplier;
import com.google.common.collect.BiMap;
import com.google.common.collect.HashBiMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.metamx.common.ISE;
import io.druid.data.input.InputRow;
import io.druid.granularity.QueryGranularity;
import io.druid.query.aggregation.Aggregator;
@ -35,7 +33,7 @@ import io.druid.segment.FloatColumnSelector;
import io.druid.segment.LongColumnSelector;
import io.druid.segment.ObjectColumnSelector;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@ -48,7 +46,7 @@ import java.util.concurrent.atomic.AtomicInteger;
public class OnheapIncrementalIndex extends IncrementalIndex<Aggregator>
{
private final ConcurrentHashMap<Integer, Aggregator[]> aggregators = new ConcurrentHashMap<>();
private final ConcurrentNavigableMap<TimeAndDims, Integer> facts = new ConcurrentSkipListMap<>();
private final ConcurrentNavigableMap<TimeAndDims, Integer> facts;
private final AtomicInteger indexIncrement = new AtomicInteger(0);
protected final int maxRowCount;
private volatile Map<String, ColumnSelectorFactory> selectors;
@ -63,6 +61,7 @@ public class OnheapIncrementalIndex extends IncrementalIndex<Aggregator>
{
super(incrementalIndexSchema, deserializeComplexMetrics);
this.maxRowCount = maxRowCount;
this.facts = new ConcurrentSkipListMap<>(dimsComparator());
}
public OnheapIncrementalIndex(
@ -115,9 +114,9 @@ public class OnheapIncrementalIndex extends IncrementalIndex<Aggregator>
}
@Override
protected DimDim makeDimDim(String dimension)
protected DimDim makeDimDim(String dimension, Object lock)
{
return new OnHeapDimDim();
return new OnHeapDimDim(lock);
}
@Override
@ -275,96 +274,116 @@ public class OnheapIncrementalIndex extends IncrementalIndex<Aggregator>
private static class OnHeapDimDim implements DimDim
{
private final Map<String, Integer> falseIds;
private final Map<Integer, String> falseIdsReverse;
private volatile String[] sortedVals = null;
final ConcurrentMap<String, String> poorMansInterning = Maps.newConcurrentMap();
private final Map<String, Integer> valueToId = Maps.newHashMap();
public OnHeapDimDim()
{
BiMap<String, Integer> biMap = Maps.synchronizedBiMap(HashBiMap.<String, Integer>create());
falseIds = biMap;
falseIdsReverse = biMap.inverse();
}
private final List<String> idToValue = Lists.newArrayList();
private final Object lock;
/**
* Returns the interned String value to allow fast comparisons using `==` instead of `.equals()`
*
* @see io.druid.segment.incremental.IncrementalIndexStorageAdapter.EntryHolderValueMatcherFactory#makeValueMatcher(String, String)
*/
public String get(String str)
public OnHeapDimDim(Object lock)
{
String prev = poorMansInterning.putIfAbsent(str, str);
return prev != null ? prev : str;
this.lock = lock;
}
public int getId(String value)
{
final Integer id = falseIds.get(value);
return id == null ? -1 : id;
synchronized (lock) {
final Integer id = valueToId.get(value);
return id == null ? -1 : id;
}
}
public String getValue(int id)
{
return falseIdsReverse.get(id);
synchronized (lock) {
return idToValue.get(id);
}
}
public boolean contains(String value)
{
return falseIds.containsKey(value);
synchronized (lock) {
return valueToId.containsKey(value);
}
}
public int size()
{
return falseIds.size();
synchronized (lock) {
return valueToId.size();
}
}
public synchronized int add(String value)
public int add(String value)
{
int id = falseIds.size();
falseIds.put(value, id);
return id;
}
public int getSortedId(String value)
{
assertSorted();
return Arrays.binarySearch(sortedVals, value);
}
public String getSortedValue(int index)
{
assertSorted();
return sortedVals[index];
}
public void sort()
{
if (sortedVals == null) {
sortedVals = new String[falseIds.size()];
int index = 0;
for (String value : falseIds.keySet()) {
sortedVals[index++] = value;
synchronized (lock) {
Integer prev = valueToId.get(value);
if (prev != null) {
return prev;
}
Arrays.sort(sortedVals);
final int index = size();
valueToId.put(value, index);
idToValue.add(value);
return index;
}
}
private void assertSorted()
public OnHeapDimLookup sort()
{
if (sortedVals == null) {
throw new ISE("Call sort() before calling the getSorted* methods.");
synchronized (lock) {
return new OnHeapDimLookup(idToValue, size());
}
}
public boolean compareCanonicalValues(String s1, String s2)
{
return s1 == s2;
}
}
// Caches references to selector objetcs for each column instead of creating a new object each time in order to save heap space.
private static class OnHeapDimLookup implements SortedDimLookup
{
private final String[] sortedVals;
private final int[] idToIndex;
private final int[] indexToId;
public OnHeapDimLookup(List<String> idToValue, int length)
{
Map<String, Integer> sortedMap = Maps.newTreeMap();
for (int id = 0; id < length; id++) {
sortedMap.put(idToValue.get(id), id);
}
this.sortedVals = sortedMap.keySet().toArray(new String[length]);
this.idToIndex = new int[length];
this.indexToId = new int[length];
int index = 0;
for (Integer id : sortedMap.values()) {
idToIndex[id] = index;
indexToId[index] = id;
index++;
}
}
@Override
public int size()
{
return sortedVals.length;
}
@Override
public int indexToId(int index)
{
return indexToId[index];
}
@Override
public String getValue(int index)
{
return sortedVals[index];
}
@Override
public int idToIndex(int id)
{
return idToIndex[id];
}
}
// Caches references to selector objects for each column instead of creating a new object each time in order to save heap space.
// In general the selectorFactory need not to thread-safe.
// here its made thread safe to support the special case of groupBy where the multiple threads can add concurrently to the IncrementalIndex.
private static class ObjectCachingColumnSelectorFactory implements ColumnSelectorFactory

View File

@ -47,9 +47,9 @@ import io.druid.segment.data.RoaringBitmapSerdeFactory;
import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.IncrementalIndexAdapter;
import io.druid.segment.incremental.IncrementalIndexSchema;
import io.druid.segment.incremental.IndexSizeExceededException;
import io.druid.segment.incremental.OnheapIncrementalIndex;
import org.joda.time.DateTime;
import io.druid.segment.incremental.IndexSizeExceededException;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Rule;
@ -219,12 +219,8 @@ public class IndexMergerTest
Assert.assertEquals(3, index.getColumnNames().size());
assertDimCompression(index, indexSpec.getDimensionCompressionStrategy());
final IndexableAdapter adapter = new QueryableIndexIndexableAdapter(index);
Iterable<Rowboat> boats = adapter.getRows();
List<Rowboat> boatList = new ArrayList<>();
for (Rowboat boat : boats) {
boatList.add(boat);
}
final QueryableIndexIndexableAdapter adapter = new QueryableIndexIndexableAdapter(index);
final List<Rowboat> boatList = ImmutableList.copyOf(adapter.getRows());
Assert.assertEquals(2, boatList.size());
Assert.assertArrayEquals(new int[][]{{0}, {1}}, boatList.get(0).getDims());
@ -846,12 +842,8 @@ public class IndexMergerTest
)
);
final IndexableAdapter adapter = new QueryableIndexIndexableAdapter(merged);
Iterable<Rowboat> boats = adapter.getRows();
List<Rowboat> boatList = new ArrayList<>();
for (Rowboat boat : boats) {
boatList.add(boat);
}
final QueryableIndexIndexableAdapter adapter = new QueryableIndexIndexableAdapter(merged);
final List<Rowboat> boatList = ImmutableList.copyOf(adapter.getRows());
Assert.assertEquals(ImmutableList.of("d3", "d1", "d2"), ImmutableList.copyOf(adapter.getDimensionNames()));
Assert.assertEquals(3, boatList.size());
@ -945,12 +937,8 @@ public class IndexMergerTest
)
);
final IndexableAdapter adapter = new QueryableIndexIndexableAdapter(merged);
Iterable<Rowboat> boats = adapter.getRows();
List<Rowboat> boatList = new ArrayList<>();
for (Rowboat boat : boats) {
boatList.add(boat);
}
final QueryableIndexIndexableAdapter adapter = new QueryableIndexIndexableAdapter(merged);
final List<Rowboat> boatList = ImmutableList.copyOf(adapter.getRows());
Assert.assertEquals(ImmutableList.of("dimA", "dimC"), ImmutableList.copyOf(adapter.getDimensionNames()));
Assert.assertEquals(4, boatList.size());
@ -1040,11 +1028,11 @@ public class IndexMergerTest
)
);
final IndexableAdapter adapter = new QueryableIndexIndexableAdapter(merged);
List<Rowboat> boatList = ImmutableList.copyOf(adapter.getRows());
final QueryableIndexIndexableAdapter adapter = new QueryableIndexIndexableAdapter(merged);
final List<Rowboat> boatList = ImmutableList.copyOf(adapter.getRows());
final IndexableAdapter adapter2 = new QueryableIndexIndexableAdapter(merged2);
List<Rowboat> boatList2 = ImmutableList.copyOf(adapter2.getRows());
final QueryableIndexIndexableAdapter adapter2 = new QueryableIndexIndexableAdapter(merged2);
final List<Rowboat> boatList2 = ImmutableList.copyOf(adapter2.getRows());
Assert.assertEquals(ImmutableList.of("dimA", "dimB"), ImmutableList.copyOf(adapter.getDimensionNames()));
Assert.assertEquals(5, boatList.size());
@ -1187,12 +1175,8 @@ public class IndexMergerTest
)
);
final IndexableAdapter adapter = new QueryableIndexIndexableAdapter(merged);
Iterable<Rowboat> boats = adapter.getRows();
List<Rowboat> boatList = new ArrayList<>();
for (Rowboat boat : boats) {
boatList.add(boat);
}
final QueryableIndexIndexableAdapter adapter = new QueryableIndexIndexableAdapter(merged);
final List<Rowboat> boatList = ImmutableList.copyOf(adapter.getRows());
Assert.assertEquals(
ImmutableList.of("d2", "d3", "d5", "d6", "d7", "d8", "d9"),
@ -1234,14 +1218,12 @@ public class IndexMergerTest
checkBitmapIndex(Lists.newArrayList(3), adapter.getBitmapIndex("d9", "921"));
}
private void checkBitmapIndex(ArrayList<Integer> expectIndex, IndexedInts index)
private void checkBitmapIndex(ArrayList<Integer> expected, IndexedInts real)
{
Assert.assertEquals(expectIndex.size(), index.size());
Assert.assertEquals(expected.size(), real.size());
int i = 0;
Iterator it = index.iterator();
while (it.hasNext()) {
Assert.assertEquals(expectIndex.get(i), it.next());
i++;
for (Object index : real) {
Assert.assertEquals(expected.get(i++), index);
}
}
@ -1361,13 +1343,11 @@ public class IndexMergerTest
)
);
final IndexableAdapter adapter = new QueryableIndexIndexableAdapter(merged);
Iterable<Rowboat> boats = adapter.getRows();
List<Rowboat> boatList = ImmutableList.copyOf(boats);
final QueryableIndexIndexableAdapter adapter = new QueryableIndexIndexableAdapter(merged);
final List<Rowboat> boatList = ImmutableList.copyOf(adapter.getRows());
final IndexableAdapter adapter2 = new QueryableIndexIndexableAdapter(merged2);
Iterable<Rowboat> boats2 = adapter2.getRows();
List<Rowboat> boatList2 = ImmutableList.copyOf(boats2);
final QueryableIndexIndexableAdapter adapter2 = new QueryableIndexIndexableAdapter(merged2);
final List<Rowboat> boatList2 = ImmutableList.copyOf(adapter2.getRows());
Assert.assertEquals(ImmutableList.of("dimB", "dimA"), ImmutableList.copyOf(adapter.getDimensionNames()));
Assert.assertEquals(5, boatList.size());

View File

@ -24,7 +24,6 @@ import com.google.common.collect.Lists;
import com.metamx.common.ISE;
import io.druid.data.input.MapBasedInputRow;
import io.druid.granularity.QueryGranularity;
import io.druid.query.TestQueryRunners;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory;
import org.joda.time.DateTime;
@ -71,23 +70,6 @@ public class IncrementalIndexTest
}
}
},
{
new IndexCreator()
{
@Override
public IncrementalIndex createIndex()
{
return new OffheapIncrementalIndex(
0,
QueryGranularity.MINUTE,
new AggregatorFactory[]{new CountAggregatorFactory("cnt")},
TestQueryRunners.pool,
true,
100 * 1024 * 1024
);
}
}
}
}

View File

@ -0,0 +1,88 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.segment.incremental;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import io.druid.data.input.MapBasedInputRow;
import io.druid.granularity.QueryGranularity;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory;
import org.junit.Assert;
import org.junit.Test;
import java.util.Arrays;
import java.util.Comparator;
import java.util.Map;
import static io.druid.segment.incremental.IncrementalIndex.TimeAndDims;
/**
*/
public class TimeAndDimsCompTest
{
@Test
public void testBasic() throws IndexSizeExceededException
{
IncrementalIndex index = new OnheapIncrementalIndex(
0, QueryGranularity.NONE, new AggregatorFactory[]{new CountAggregatorFactory("cnt")}, 1000
);
long time = System.currentTimeMillis();
TimeAndDims td1 = index.toTimeAndDims(toMapRow(time, "billy", "A", "joe", "B"));
TimeAndDims td2 = index.toTimeAndDims(toMapRow(time, "billy", "A", "joe", "A"));
TimeAndDims td3 = index.toTimeAndDims(toMapRow(time, "billy", "A"));
TimeAndDims td4 = index.toTimeAndDims(toMapRow(time + 1, "billy", "A", "joe", "B"));
TimeAndDims td5 = index.toTimeAndDims(toMapRow(time + 1, "billy", "A", "joe", Arrays.asList("A", "B")));
TimeAndDims td6 = index.toTimeAndDims(toMapRow(time + 1));
Comparator<IncrementalIndex.TimeAndDims> comparator = index.dimsComparator();
Assert.assertEquals(0, comparator.compare(td1, td1));
Assert.assertEquals(0, comparator.compare(td2, td2));
Assert.assertEquals(0, comparator.compare(td3, td3));
Assert.assertTrue(comparator.compare(td1, td2) > 0);
Assert.assertTrue(comparator.compare(td2, td1) < 0);
Assert.assertTrue(comparator.compare(td2, td3) > 0);
Assert.assertTrue(comparator.compare(td3, td2) < 0);
Assert.assertTrue(comparator.compare(td1, td3) > 0);
Assert.assertTrue(comparator.compare(td3, td1) < 0);
Assert.assertTrue(comparator.compare(td6, td1) > 0);
Assert.assertTrue(comparator.compare(td6, td2) > 0);
Assert.assertTrue(comparator.compare(td6, td3) > 0);
Assert.assertTrue(comparator.compare(td4, td6) > 0);
Assert.assertTrue(comparator.compare(td5, td6) > 0);
Assert.assertTrue(comparator.compare(td4, td5) < 0);
Assert.assertTrue(comparator.compare(td5, td4) > 0);
}
private MapBasedInputRow toMapRow(long time, Object... dimAndVal)
{
Map<String, Object> data = Maps.newHashMap();
for (int i = 0; i < dimAndVal.length; i += 2) {
data.put((String) dimAndVal[i], dimAndVal[i + 1]);
}
return new MapBasedInputRow(time, Lists.newArrayList(data.keySet()), data);
}
}