From 1d602be0f9771f6db53fcff5cb8c37e2c2279dd5 Mon Sep 17 00:00:00 2001 From: "navis.ryu" Date: Fri, 11 Dec 2015 20:45:16 +0900 Subject: [PATCH] Replace string[] with int[] for dimensions --- .../io/druid/segment/IndexableAdapter.java | 2 - .../QueryableIndexIndexableAdapter.java | 5 +- .../segment/RowboatFilteringIndexAdapter.java | 6 - .../segment/incremental/IncrementalIndex.java | 323 +++++++----- .../incremental/IncrementalIndexAdapter.java | 205 ++++---- .../IncrementalIndexStorageAdapter.java | 121 +++-- .../incremental/OffheapIncrementalIndex.java | 486 ------------------ .../incremental/OnheapIncrementalIndex.java | 151 +++--- .../io/druid/segment/IndexMergerTest.java | 62 +-- .../incremental/IncrementalIndexTest.java | 18 - .../incremental/TimeAndDimsCompTest.java | 88 ++++ 11 files changed, 557 insertions(+), 910 deletions(-) delete mode 100644 processing/src/main/java/io/druid/segment/incremental/OffheapIncrementalIndex.java create mode 100644 processing/src/test/java/io/druid/segment/incremental/TimeAndDimsCompTest.java diff --git a/processing/src/main/java/io/druid/segment/IndexableAdapter.java b/processing/src/main/java/io/druid/segment/IndexableAdapter.java index 6b9cae56a3f..e3aaed1e7e3 100644 --- a/processing/src/main/java/io/druid/segment/IndexableAdapter.java +++ b/processing/src/main/java/io/druid/segment/IndexableAdapter.java @@ -41,8 +41,6 @@ public interface IndexableAdapter Iterable getRows(); - IndexedInts getBitmapIndex(String dimension, String value); - IndexedInts getBitmapIndex(String dimension, int dictId); String getMetricType(String metric); diff --git a/processing/src/main/java/io/druid/segment/QueryableIndexIndexableAdapter.java b/processing/src/main/java/io/druid/segment/QueryableIndexIndexableAdapter.java index b18727511e2..7cb680a242c 100644 --- a/processing/src/main/java/io/druid/segment/QueryableIndexIndexableAdapter.java +++ b/processing/src/main/java/io/druid/segment/QueryableIndexIndexableAdapter.java @@ -19,6 +19,7 @@ package io.druid.segment; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Function; import com.google.common.collect.FluentIterable; import com.google.common.collect.Lists; @@ -294,8 +295,8 @@ public class QueryableIndexIndexableAdapter implements IndexableAdapter }; } - @Override - public IndexedInts getBitmapIndex(String dimension, String value) + @VisibleForTesting + IndexedInts getBitmapIndex(String dimension, String value) { final Column column = input.getColumn(dimension); diff --git a/processing/src/main/java/io/druid/segment/RowboatFilteringIndexAdapter.java b/processing/src/main/java/io/druid/segment/RowboatFilteringIndexAdapter.java index ec77ab78ecc..b4dcc742de1 100644 --- a/processing/src/main/java/io/druid/segment/RowboatFilteringIndexAdapter.java +++ b/processing/src/main/java/io/druid/segment/RowboatFilteringIndexAdapter.java @@ -75,12 +75,6 @@ public class RowboatFilteringIndexAdapter implements IndexableAdapter return Iterables.filter(baseAdapter.getRows(), filter); } - @Override - public IndexedInts getBitmapIndex(String dimension, String value) - { - return baseAdapter.getBitmapIndex(dimension, value); - } - @Override public String getMetricType(String metric) { diff --git a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java index 2d2d5c42f6b..46f1a79194a 100644 --- a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java +++ b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java @@ -19,6 +19,7 @@ package io.druid.segment.incremental; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Function; import com.google.common.base.Strings; import com.google.common.base.Supplier; @@ -58,10 +59,13 @@ import org.joda.time.DateTime; import org.joda.time.Interval; import javax.annotation.Nullable; +import javax.annotation.concurrent.GuardedBy; import java.io.Closeable; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; +import java.util.Comparator; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -268,6 +272,7 @@ public abstract class IncrementalIndex implements Iterable, private final Map metricDescs; private final Map dimensionDescs; private final Map columnCapabilities; + private final List dimValues; private final AtomicInteger numEntries = new AtomicInteger(); @@ -316,13 +321,11 @@ public abstract class IncrementalIndex implements Iterable, DimensionsSpec dimensionsSpec = incrementalIndexSchema.getDimensionsSpec(); this.dimensionDescs = Maps.newLinkedHashMap(); + this.dimValues = Collections.synchronizedList(Lists.newArrayList()); for (String dimension : dimensionsSpec.getDimensions()) { ColumnCapabilitiesImpl capabilities = new ColumnCapabilitiesImpl(); capabilities.setType(ValueType.STRING); - dimensionDescs.put( - dimension, - new DimensionDesc(dimensionDescs.size(), dimension, newDimDim(dimension), capabilities) - ); + addNewDimension(dimension, capabilities); columnCapabilities.put(dimension, capabilities); } @@ -341,7 +344,7 @@ public abstract class IncrementalIndex implements Iterable, private DimDim newDimDim(String dimension) { - return new NullValueConverterDimDim(makeDimDim(dimension)); + return new NullValueConverterDimDim(makeDimDim(dimension, dimensionDescs)); } public abstract ConcurrentNavigableMap getFacts(); @@ -351,7 +354,7 @@ public abstract class IncrementalIndex implements Iterable, public abstract String getOutOfRowsReason(); // use newDimDim - protected abstract DimDim makeDimDim(String dimension); + protected abstract DimDim makeDimDim(String dimension, Object lock); protected abstract AggregatorType[] initAggs( AggregatorFactory[] metrics, @@ -383,7 +386,7 @@ public abstract class IncrementalIndex implements Iterable, @Override public void close() { - // Nothing to close + dimValues.clear(); } public InputRow formatRow(InputRow row) @@ -410,7 +413,15 @@ public abstract class IncrementalIndex implements Iterable, * * @return the number of rows in the data set after adding the InputRow */ - public int add(InputRow row) throws IndexSizeExceededException + public int add(InputRow row) throws IndexSizeExceededException { + TimeAndDims key = toTimeAndDims(row); + final int rv = addToFacts(metrics, deserializeComplexMetrics, row, numEntries, key, in, rowSupplier); + updateMaxIngestedTime(row.getTimestamp()); + return rv; + } + + @VisibleForTesting + TimeAndDims toTimeAndDims(InputRow row) throws IndexSizeExceededException { row = formatRow(row); if (row.getTimestampFromEpoch() < minTimestamp) { @@ -419,10 +430,10 @@ public abstract class IncrementalIndex implements Iterable, final List rowDimensions = row.getDimensions(); - String[][] dims; - List overflow = null; + int[][] dims; + List overflow = null; synchronized (dimensionDescs) { - dims = new String[dimensionDescs.size()][]; + dims = new int[dimensionDescs.size()][]; for (String dimension : rowDimensions) { List dimensionValues = row.getDimension(dimension); @@ -445,8 +456,7 @@ public abstract class IncrementalIndex implements Iterable, } if (desc == null) { - desc = new DimensionDesc(dimensionDescs.size(), dimension, newDimDim(dimension), capabilities); - dimensionDescs.put(dimension, desc); + desc = addNewDimension(dimension, capabilities); if (overflow == null) { overflow = Lists.newArrayList(); @@ -471,7 +481,7 @@ public abstract class IncrementalIndex implements Iterable, if (overflow != null) { // Merge overflow and non-overflow - String[][] newDims = new String[dims.length + overflow.size()][]; + int[][] newDims = new int[dims.length + overflow.size()][]; System.arraycopy(dims, 0, newDims, 0, dims.length); for (int i = 0; i < overflow.size(); ++i) { newDims[dims.length + i] = overflow.get(i); @@ -479,13 +489,11 @@ public abstract class IncrementalIndex implements Iterable, dims = newDims; } - final TimeAndDims key = new TimeAndDims(Math.max(gran.truncate(row.getTimestampFromEpoch()), minTimestamp), dims); - final Integer rv = addToFacts(metrics, deserializeComplexMetrics, row, numEntries, key, in, rowSupplier); - updateMaxIngestedTime(row.getTimestamp()); - return rv; + long truncated = gran.truncate(row.getTimestampFromEpoch()); + return new TimeAndDims(Math.max(truncated, minTimestamp), dims); } - public synchronized void updateMaxIngestedTime(DateTime eventTime) + private synchronized void updateMaxIngestedTime(DateTime eventTime) { if (maxIngestedEventTime == null || maxIngestedEventTime.isBefore(eventTime)) { maxIngestedEventTime = eventTime; @@ -512,26 +520,26 @@ public abstract class IncrementalIndex implements Iterable, return getFacts().lastKey().getTimestamp(); } - private String[] getDimVals(final DimDim dimLookup, final List dimValues) + private int[] getDimVals(final DimDim dimLookup, final List dimValues) { - final String[] retVal = new String[dimValues.size()]; if (dimValues.size() == 0) { // NULL VALUE - if (!dimLookup.contains(null)) { - dimLookup.add(null); - } + dimLookup.add(null); return null; } - int count = 0; - for (String dimValue : dimValues) { - String canonicalDimValue = dimLookup.get(dimValue); - if (!dimLookup.contains(canonicalDimValue)) { - dimLookup.add(dimValue); - } - retVal[count] = canonicalDimValue; - count++; + + if (dimValues.size() == 1) { + return new int[]{dimLookup.add(dimValues.get(0))}; + } + + String[] dimArray = dimValues.toArray(new String[dimValues.size()]); + Arrays.sort(dimArray); + + final int[] retVal = new int[dimArray.length]; + + for (int i = 0; i < dimArray.length; i++) { + retVal[i] = dimLookup.add(dimArray[i]); } - Arrays.sort(retVal); return retVal; } @@ -594,12 +602,6 @@ public abstract class IncrementalIndex implements Iterable, return dimSpec == null ? null : dimSpec.getValues(); } - public Integer getDimensionIndex(String dimension) - { - DimensionDesc dimSpec = getDimension(dimension); - return dimSpec == null ? null : dimSpec.getIndex(); - } - public List getDimensionOrder() { synchronized (dimensionDescs) { @@ -609,7 +611,7 @@ public abstract class IncrementalIndex implements Iterable, /* * Currently called to initialize IncrementalIndex dimension order during index creation - * Index dimension ordering could be changed to initalize from DimensionsSpec after resolution of + * Index dimension ordering could be changed to initialize from DimensionsSpec after resolution of * https://github.com/druid-io/druid/issues/2011 */ public void loadDimensionIterable(Iterable oldDimensionOrder) @@ -623,13 +625,25 @@ public abstract class IncrementalIndex implements Iterable, ColumnCapabilitiesImpl capabilities = new ColumnCapabilitiesImpl(); capabilities.setType(ValueType.STRING); columnCapabilities.put(dim, capabilities); - DimensionDesc desc = new DimensionDesc(dimensionDescs.size(), dim, newDimDim(dim), capabilities); - dimensionDescs.put(dim, desc); + addNewDimension(dim, capabilities); } } } } + @GuardedBy("dimensionDescs") + private DimensionDesc addNewDimension(String dim, ColumnCapabilitiesImpl capabilities) + { + DimensionDesc desc = new DimensionDesc(dimensionDescs.size(), dim, newDimDim(dim), capabilities); + if (dimValues.size() != desc.getIndex()) { + throw new ISE("dimensionDescs and dimValues for [%s] is out of sync!!", dim); + } + + dimensionDescs.put(dim, desc); + dimValues.add(desc.getValues()); + return desc; + } + public List getMetricNames() { return ImmutableList.copyOf(metricDescs.keySet()); @@ -678,13 +692,13 @@ public abstract class IncrementalIndex implements Iterable, public Iterable iterableWithPostAggregations(final List postAggs, final boolean descending) { - final List dimensions = getDimensionNames(); - final ConcurrentNavigableMap facts = descending ? getFacts().descendingMap() : getFacts(); return new Iterable() { @Override public Iterator iterator() { + final List dimensions = getDimensions(); + final ConcurrentNavigableMap facts = descending ? getFacts().descendingMap() : getFacts(); return Iterators.transform( facts.entrySet().iterator(), new Function, Row>() @@ -695,15 +709,28 @@ public abstract class IncrementalIndex implements Iterable, final TimeAndDims timeAndDims = input.getKey(); final int rowOffset = input.getValue(); - String[][] theDims = timeAndDims.getDims(); + int[][] theDims = timeAndDims.getDims(); Map theVals = Maps.newLinkedHashMap(); for (int i = 0; i < theDims.length; ++i) { - String[] dim = theDims[i]; - if (dim != null && dim.length != 0) { - theVals.put(dimensions.get(i), dim.length == 1 ? dim[0] : Arrays.asList(dim)); + int[] dim = theDims[i]; + DimensionDesc dimensionDesc = dimensions.get(i); + if (dimensionDesc == null) { + continue; + } + String dimensionName = dimensionDesc.getName(); + if (dim == null || dim.length == 0) { + theVals.put(dimensionName, null); + continue; + } + if (dim.length == 1) { + theVals.put(dimensionName, Strings.nullToEmpty(dimensionDesc.getValues().getValue(dim[0]))); } else { - theVals.put(dimensions.get(i), null); + String[] dimStringValue = new String[dim.length]; + for (int j = 0; j < dimStringValue.length; j++) { + dimStringValue[j] = Strings.nullToEmpty(dimensionDesc.getValues().getValue(dim[j])); + } + theVals.put(dimensionName, dimStringValue); } } @@ -731,7 +758,7 @@ public abstract class IncrementalIndex implements Iterable, return maxIngestedEventTime; } - public static class DimensionDesc + public static final class DimensionDesc { private final int index; private final String name; @@ -767,7 +794,7 @@ public abstract class IncrementalIndex implements Iterable, } } - public static class MetricDesc + public static final class MetricDesc { private final int index; private final String name; @@ -812,8 +839,6 @@ public abstract class IncrementalIndex implements Iterable, static interface DimDim { - public String get(String value); - public int getId(String value); public String getValue(int id); @@ -824,13 +849,18 @@ public abstract class IncrementalIndex implements Iterable, public int add(String value); - public int getSortedId(String value); + public SortedDimLookup sort(); + } - public String getSortedValue(int index); + static interface SortedDimLookup + { + public int size(); - public void sort(); + public int idToIndex(int id); - public boolean compareCanonicalValues(String s1, String s2); + public int indexToId(int index); + + public String getValue(int index); } /** @@ -845,12 +875,6 @@ public abstract class IncrementalIndex implements Iterable, this.delegate = delegate; } - @Override - public String get(String value) - { - return delegate.get(Strings.nullToEmpty(value)); - } - @Override public int getId(String value) { @@ -882,38 +906,54 @@ public abstract class IncrementalIndex implements Iterable, } @Override - public int getSortedId(String value) + public SortedDimLookup sort() { - return delegate.getSortedId(Strings.nullToEmpty(value)); - } - - @Override - public String getSortedValue(int index) - { - return Strings.emptyToNull(delegate.getSortedValue(index)); - } - - @Override - public void sort() - { - delegate.sort(); - } - - @Override - public boolean compareCanonicalValues(String s1, String s2) - { - return delegate.compareCanonicalValues(Strings.nullToEmpty(s1), Strings.nullToEmpty(s2)); + return new NullValueConverterDimLookup(delegate.sort()); } } - static class TimeAndDims implements Comparable + private static class NullValueConverterDimLookup implements SortedDimLookup + { + private final SortedDimLookup delegate; + + public NullValueConverterDimLookup(SortedDimLookup delegate) + { + this.delegate = delegate; + } + + @Override + public int size() + { + return delegate.size(); + } + + @Override + public int indexToId(int index) + { + return delegate.indexToId(index); + } + + @Override + public int idToIndex(int id) + { + return delegate.idToIndex(id); + } + + @Override + public String getValue(int index) + { + return Strings.emptyToNull(delegate.getValue(index)); + } + } + + static final class TimeAndDims { private final long timestamp; - private final String[][] dims; + private final int[][] dims; TimeAndDims( long timestamp, - String[][] dims + int[][] dims ) { this.timestamp = timestamp; @@ -925,61 +965,21 @@ public abstract class IncrementalIndex implements Iterable, return timestamp; } - String[][] getDims() + int[][] getDims() { return dims; } - @Override - public int compareTo(TimeAndDims rhs) - { - int retVal = Longs.compare(timestamp, rhs.timestamp); - int numComparisons = Math.min(dims.length, rhs.dims.length); - - int index = 0; - while (retVal == 0 && index < numComparisons) { - String[] lhsVals = dims[index]; - String[] rhsVals = rhs.dims[index]; - - if (lhsVals == null) { - if (rhsVals == null) { - ++index; - continue; - } - return -1; - } - - if (rhsVals == null) { - return 1; - } - - retVal = Ints.compare(lhsVals.length, rhsVals.length); - - int valsIndex = 0; - while (retVal == 0 && valsIndex < lhsVals.length) { - retVal = lhsVals[valsIndex].compareTo(rhsVals[valsIndex]); - ++valsIndex; - } - ++index; - } - - if (retVal == 0) { - return Ints.compare(dims.length, rhs.dims.length); - } - - return retVal; - } - @Override public String toString() { return "TimeAndDims{" + "timestamp=" + new DateTime(timestamp) + ", dims=" + Lists.transform( - Arrays.asList(dims), new Function() + Arrays.asList(dims), new Function() { @Override - public Object apply(@Nullable String[] input) + public Object apply(@Nullable int[] input) { if (input == null || input.length == 0) { return Arrays.asList("null"); @@ -990,4 +990,69 @@ public abstract class IncrementalIndex implements Iterable, ) + '}'; } } + + protected final Comparator dimsComparator() + { + return new TimeAndDimsComp(dimValues); + } + + @VisibleForTesting + static final class TimeAndDimsComp implements Comparator + { + private final List dimValues; + + public TimeAndDimsComp(List dimValues) + { + this.dimValues = dimValues; + } + + @Override + public int compare(TimeAndDims lhs, TimeAndDims rhs) + { + int retVal = Longs.compare(lhs.timestamp, rhs.timestamp); + int numComparisons = Math.min(lhs.dims.length, rhs.dims.length); + + int index = 0; + while (retVal == 0 && index < numComparisons) { + final int[] lhsIdxs = lhs.dims[index]; + final int[] rhsIdxs = rhs.dims[index]; + + if (lhsIdxs == null) { + if (rhsIdxs == null) { + ++index; + continue; + } + return -1; + } + + if (rhsIdxs == null) { + return 1; + } + + retVal = Ints.compare(lhsIdxs.length, rhsIdxs.length); + + int valsIndex = 0; + while (retVal == 0 && valsIndex < lhsIdxs.length) { + if (lhsIdxs[valsIndex] != rhsIdxs[valsIndex]) { + final DimDim dimLookup = dimValues.get(index); + final String lhsVal = dimLookup.getValue(lhsIdxs[valsIndex]); + final String rhsVal = dimLookup.getValue(rhsIdxs[valsIndex]); + if (lhsVal != null && rhsVal != null) { + retVal = lhsVal.compareTo(rhsVal); + } else if (lhsVal == null ^ rhsVal == null) { + retVal = lhsVal == null ? -1 : 1; + } + } + ++valsIndex; + } + ++index; + } + + if (retVal == 0) { + return Ints.compare(lhs.dims.length, rhs.dims.length); + } + + return retVal; + } + } } diff --git a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexAdapter.java b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexAdapter.java index 814252cb9e2..e1dfb720887 100644 --- a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexAdapter.java +++ b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexAdapter.java @@ -39,7 +39,6 @@ import io.druid.segment.data.ListIndexed; import org.joda.time.Interval; import org.roaringbitmap.IntIterator; -import javax.annotation.Nullable; import java.io.IOException; import java.util.Iterator; import java.util.List; @@ -53,9 +52,40 @@ public class IncrementalIndexAdapter implements IndexableAdapter private static final Logger log = new Logger(IncrementalIndexAdapter.class); private final Interval dataInterval; private final IncrementalIndex index; - private final Map> invertedIndexes; private final Set hasNullValueDimensions; - private final Metadata metadata; + + private final Map indexers; + + private class DimensionIndexer + { + private final IncrementalIndex.DimensionDesc dimensionDesc; + private final MutableBitmap[] invertedIndexes; + + private IncrementalIndex.SortedDimLookup dimLookup; + + public DimensionIndexer(IncrementalIndex.DimensionDesc dimensionDesc) + { + this.dimensionDesc = dimensionDesc; + this.invertedIndexes = new MutableBitmap[dimensionDesc.getValues().size() + 1]; + } + + private IncrementalIndex.DimDim getDimValues() + { + return dimensionDesc.getValues(); + } + + private IncrementalIndex.SortedDimLookup getDimLookup() + { + if (dimLookup == null) { + final IncrementalIndex.DimDim dimDim = dimensionDesc.getValues(); + if (hasNullValueDimensions.contains(dimensionDesc.getName()) && !dimDim.contains(null)) { + dimDim.add(null); + } + dimLookup = dimDim.sort(); + } + return dimLookup; + } + } public IncrementalIndexAdapter( Interval dataInterval, IncrementalIndex index, BitmapFactory bitmapFactory @@ -63,9 +93,7 @@ public class IncrementalIndexAdapter implements IndexableAdapter { this.dataInterval = dataInterval; this.index = index; - this.metadata = index.getMetadata(); - this.invertedIndexes = Maps.newHashMap(); /* Sometimes it's hard to tell whether one dimension contains a null value or not. * If one dimension had show a null or empty value explicitly, then yes, it contains * null value. But if one dimension's values are all non-null, it still early to say @@ -79,40 +107,35 @@ public class IncrementalIndexAdapter implements IndexableAdapter final List dimensions = index.getDimensions(); + indexers = Maps.newHashMapWithExpectedSize(dimensions.size()); for (IncrementalIndex.DimensionDesc dimension : dimensions) { - invertedIndexes.put(dimension.getName(), Maps.newHashMap()); + indexers.put(dimension.getName(), new DimensionIndexer(dimension)); } int rowNum = 0; for (IncrementalIndex.TimeAndDims timeAndDims : index.getFacts().keySet()) { - final String[][] dims = timeAndDims.getDims(); + final int[][] dims = timeAndDims.getDims(); for (IncrementalIndex.DimensionDesc dimension : dimensions) { final int dimIndex = dimension.getIndex(); - final Map bitmapIndexes = invertedIndexes.get(dimension.getName()); - - if (bitmapIndexes == null || dims == null) { - log.error("bitmapIndexes and dims are null!"); - continue; - } + DimensionIndexer indexer = indexers.get(dimension.getName()); if (dimIndex >= dims.length || dims[dimIndex] == null) { hasNullValueDimensions.add(dimension.getName()); continue; } - if (hasNullValue(dims[dimIndex])) { + final IncrementalIndex.DimDim values = dimension.getValues(); + if (hasNullValue(values, dims[dimIndex])) { hasNullValueDimensions.add(dimension.getName()); } - for (String dimValue : dims[dimIndex]) { - MutableBitmap mutableBitmap = bitmapIndexes.get(dimValue); + final MutableBitmap[] bitmapIndexes = indexer.invertedIndexes; - if (mutableBitmap == null) { - mutableBitmap = bitmapFactory.makeEmptyMutableBitmap(); - bitmapIndexes.put(dimValue, mutableBitmap); + for (int dimIdx : dims[dimIndex]) { + if (bitmapIndexes[dimIdx] == null) { + bitmapIndexes[dimIdx] = bitmapFactory.makeEmptyMutableBitmap(); } - try { - mutableBitmap.add(rowNum); + bitmapIndexes[dimIdx].add(rowNum); } catch (Exception e) { log.info(e.toString()); @@ -151,51 +174,46 @@ public class IncrementalIndexAdapter implements IndexableAdapter @Override public Indexed getDimValueLookup(String dimension) { - final IncrementalIndex.DimDim dimDim = index.getDimensionValues(dimension); - - if (dimDim != null) { - if (hasNullValueDimensions.contains(dimension) - && !dimDim.contains(null)) - { - dimDim.add(null); - } - dimDim.sort(); - - return new Indexed() - { - @Override - public Class getClazz() - { - return String.class; - } - - @Override - public int size() - { - return dimDim.size(); - } - - @Override - public String get(int index) - { - return dimDim.getSortedValue(index); - } - - @Override - public int indexOf(String value) - { - return dimDim.getSortedId(value); - } - - @Override - public Iterator iterator() - { - return IndexedIterable.create(this).iterator(); - } - }; - } else { + final DimensionIndexer indexer = indexers.get(dimension); + if (indexer == null) { return null; } + final IncrementalIndex.DimDim dimDim = indexer.getDimValues(); + final IncrementalIndex.SortedDimLookup dimLookup = indexer.getDimLookup(); + + return new Indexed() + { + @Override + public Class getClazz() + { + return String.class; + } + + @Override + public int size() + { + return dimLookup.size(); + } + + @Override + public String get(int index) + { + return dimLookup.getValue(index); + } + + @Override + public int indexOf(String value) + { + int id = dimDim.getId(value); + return id < 0 ? -1 : dimLookup.idToIndex(id); + } + + @Override + public Iterator iterator() + { + return IndexedIterable.create(this).iterator(); + } + }; } @Override @@ -207,31 +225,32 @@ public class IncrementalIndexAdapter implements IndexableAdapter public Iterator iterator() { final List dimensions = index.getDimensions(); + final IncrementalIndex.SortedDimLookup[] dimLookups = new IncrementalIndex.SortedDimLookup[dimensions.size()]; + for (IncrementalIndex.DimensionDesc dimension : dimensions) { + dimLookups[dimension.getIndex()] = indexers.get(dimension.getName()).getDimLookup(); + } + /* * Note that the transform function increments a counter to determine the rowNum of * the iterated Rowboats. We need to return a new iterator on each * iterator() call to ensure the counter starts at 0. */ - return (Iterators.transform( + return Iterators.transform( index.getFacts().entrySet().iterator(), new Function, Rowboat>() { int count = 0; @Override - public Rowboat apply( - @Nullable Map.Entry input - ) + public Rowboat apply(Map.Entry input) { final IncrementalIndex.TimeAndDims timeAndDims = input.getKey(); - final String[][] dimValues = timeAndDims.getDims(); + final int[][] dimValues = timeAndDims.getDims(); final int rowOffset = input.getValue(); int[][] dims = new int[dimValues.length][]; for (IncrementalIndex.DimensionDesc dimension : dimensions) { final int dimIndex = dimension.getIndex(); - final IncrementalIndex.DimDim dimDim = dimension.getValues(); - dimDim.sort(); if (dimIndex >= dimValues.length || dimValues[dimIndex] == null) { continue; @@ -244,7 +263,7 @@ public class IncrementalIndexAdapter implements IndexableAdapter } for (int i = 0; i < dimValues[dimIndex].length; ++i) { - dims[dimIndex][i] = dimDim.getSortedId(dimValues[dimIndex][i]); + dims[dimIndex][i] = dimLookups[dimIndex].idToIndex(dimValues[dimIndex][i]); } } @@ -261,21 +280,26 @@ public class IncrementalIndexAdapter implements IndexableAdapter ); } } - )); + ); } }; } @Override - public IndexedInts getBitmapIndex(String dimension, String value) + public IndexedInts getBitmapIndex(String dimension, int index) { - Map dimInverted = invertedIndexes.get(dimension); - - if (dimInverted == null) { + DimensionIndexer accessor = indexers.get(dimension); + if (accessor == null) { return EmptyIndexedInts.EMPTY_INDEXED_INTS; } - final MutableBitmap bitmapIndex = dimInverted.get(value); + IncrementalIndex.SortedDimLookup dimLookup = accessor.getDimLookup(); + final int id = dimLookup.indexToId(index); + if (id < 0 || id >= dimLookup.size()) { + return EmptyIndexedInts.EMPTY_INDEXED_INTS; + } + + MutableBitmap bitmapIndex = accessor.invertedIndexes[id]; if (bitmapIndex == null) { return EmptyIndexedInts.EMPTY_INDEXED_INTS; @@ -296,27 +320,13 @@ public class IncrementalIndexAdapter implements IndexableAdapter return index.getCapabilities(column); } - @Override - public IndexedInts getBitmapIndex(String dimension, int dictId) + private boolean hasNullValue(IncrementalIndex.DimDim dimDim, int[] dimIndices) { - if (dictId >= 0) { - final Indexed dimValues = getDimValueLookup(dimension); - //NullValueConverterDimDim will convert empty to null, we need convert it back to the actual values, - //because getBitmapIndex relies on the actual values stored in DimDim. - String value = Strings.nullToEmpty(dimValues.get(dictId)); - return getBitmapIndex(dimension, value); - } else { - return EmptyIndexedInts.EMPTY_INDEXED_INTS; - } - } - - private boolean hasNullValue(String[] dimValues) - { - if (dimValues == null || dimValues.length == 0) { + if (dimIndices == null || dimIndices.length == 0) { return true; } - for (String dimVal : dimValues) { - if (Strings.isNullOrEmpty(dimVal)) { + for (int dimIndex : dimIndices) { + if (Strings.isNullOrEmpty(dimDim.getValue(dimIndex))) { return true; } } @@ -382,13 +392,12 @@ public class IncrementalIndexAdapter implements IndexableAdapter @Override public void close() throws IOException { - } } @Override public Metadata getMetadata() { - return metadata; + return index.getMetadata(); } } diff --git a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexStorageAdapter.java b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexStorageAdapter.java index ce9bc7f45d7..d430f2e8695 100644 --- a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexStorageAdapter.java +++ b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexStorageAdapter.java @@ -26,6 +26,7 @@ import com.google.common.base.Strings; import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterators; import com.google.common.collect.Lists; +import com.google.common.primitives.Ints; import com.metamx.collections.spatial.search.Bound; import com.metamx.common.guava.Sequence; import com.metamx.common.guava.Sequences; @@ -59,7 +60,6 @@ import org.joda.time.Interval; import javax.annotation.Nullable; import java.io.IOException; -import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -220,10 +220,10 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter { cursorMap = index.getSubMap( new IncrementalIndex.TimeAndDims( - timeStart, new String[][]{} + timeStart, new int[][]{} ), new IncrementalIndex.TimeAndDims( - Math.min(actualInterval.getEndMillis(), gran.next(input)), new String[][]{} + Math.min(actualInterval.getEndMillis(), gran.next(input)), new int[][]{} ) ); if (descending) { @@ -329,57 +329,52 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter return new SingleScanTimeDimSelector(makeLongColumnSelector(dimension), extractionFn, descending); } - final IncrementalIndex.DimDim dimValLookup = index.getDimensionValues(dimension); - if (dimValLookup == null) { + final IncrementalIndex.DimensionDesc dimensionDesc = index.getDimension(dimension); + if (dimensionDesc == null) { return NULL_DIMENSION_SELECTOR; } + final int dimIndex = dimensionDesc.getIndex(); + final IncrementalIndex.DimDim dimValLookup = dimensionDesc.getValues(); + final int maxId = dimValLookup.size(); - final int dimIndex = index.getDimensionIndex(dimension); return new DimensionSelector() { @Override public IndexedInts getRow() { - final ArrayList vals = Lists.newArrayList(); - if (dimIndex < currEntry.getKey().getDims().length) { - final String[] dimVals = currEntry.getKey().getDims()[dimIndex]; - if (dimVals != null) { - for (String dimVal : dimVals) { - int id = dimValLookup.getId(dimVal); - if (id < maxId) { - vals.add(id); - } - } - } + final int[][] dims = currEntry.getKey().getDims(); + + int[] indices = dimIndex < dims.length ? dims[dimIndex] : null; + if (indices == null) { + indices = new int[0]; } // check for null entry - if (vals.isEmpty() && dimValLookup.contains(null)) { - int id = dimValLookup.getId(null); - if (id < maxId) { - vals.add(id); - } + if (indices.length == 0 && dimValLookup.contains(null)) { + indices = new int[] { dimValLookup.getId(null) }; } + final int[] vals = indices; + return new IndexedInts() { @Override public int size() { - return vals.size(); + return vals.length; } @Override public int get(int index) { - return vals.get(index); + return vals[index]; } @Override public Iterator iterator() { - return vals.iterator(); + return Ints.asList(vals).iterator(); } @Override @@ -533,10 +528,13 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter }; } - final Integer dimensionIndexInt = index.getDimensionIndex(column); + IncrementalIndex.DimensionDesc dimensionDesc = index.getDimension(column); + + if (dimensionDesc != null) { + + final int dimensionIndex = dimensionDesc.getIndex(); + final IncrementalIndex.DimDim dimDim = dimensionDesc.getValues(); - if (dimensionIndexInt != null) { - final int dimensionIndex = dimensionIndexInt; return new ObjectColumnSelector() { @Override @@ -553,17 +551,21 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter return null; } - String[][] dims = key.getDims(); + int[][] dims = key.getDims(); if (dimensionIndex >= dims.length) { return null; } - final String[] dimVals = dims[dimensionIndex]; - if (dimVals == null || dimVals.length == 0) { + final int[] dimIdx = dims[dimensionIndex]; + if (dimIdx == null || dimIdx.length == 0) { return null; } - if (dimVals.length == 1) { - return dimVals[0]; + if (dimIdx.length == 1) { + return dimDim.getValue(dimIdx[0]); + } + String[] dimVals = new String[dimIdx.length]; + for (int i = 0; i < dimIdx.length; i++) { + dimVals[i] = dimDim.getValue(dimIdx[i]); } return dimVals; } @@ -624,21 +626,22 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter @Override public ValueMatcher makeValueMatcher(String dimension, final String value) { - Integer dimIndexObject = index.getDimensionIndex(dimension); - if (dimIndexObject == null) { + IncrementalIndex.DimensionDesc dimensionDesc = index.getDimension(dimension); + if (dimensionDesc == null) { return new BooleanValueMatcher(Strings.isNullOrEmpty(value)); } - final IncrementalIndex.DimDim dimDim = index.getDimensionValues(dimension); - if (!dimDim.contains(value)) { - if (Strings.isNullOrEmpty(value)) { - final int dimIndex = dimIndexObject; + final int dimIndex = dimensionDesc.getIndex(); + final IncrementalIndex.DimDim dimDim = dimensionDesc.getValues(); + final Integer id = dimDim.getId(value); + if (id == null) { + if (Strings.isNullOrEmpty(value)) { return new ValueMatcher() { @Override public boolean matches() { - String[][] dims = holder.getKey().getDims(); + int[][] dims = holder.getKey().getDims(); if (dimIndex >= dims.length || dims[dimIndex] == null) { return true; } @@ -649,25 +652,17 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter return new BooleanValueMatcher(false); } - final int dimIndex = dimIndexObject; - final String id = dimDim.get(value); - return new ValueMatcher() { @Override public boolean matches() { - String[][] dims = holder.getKey().getDims(); + int[][] dims = holder.getKey().getDims(); if (dimIndex >= dims.length || dims[dimIndex] == null) { return Strings.isNullOrEmpty(value); } - for (String dimVal : dims[dimIndex]) { - if (dimDim.compareCanonicalValues(id, dimVal)) { - return true; - } - } - return false; + return Ints.indexOf(dims[dimIndex], id) >= 0; } }; } @@ -675,24 +670,25 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter @Override public ValueMatcher makeValueMatcher(String dimension, final Predicate predicate) { - Integer dimIndexObject = index.getDimensionIndex(dimension); - if (dimIndexObject == null) { + IncrementalIndex.DimensionDesc dimensionDesc = index.getDimension(dimension); + if (dimensionDesc == null) { return new BooleanValueMatcher(false); } - final int dimIndex = dimIndexObject; + final int dimIndex = dimensionDesc.getIndex(); + final IncrementalIndex.DimDim dimDim = dimensionDesc.getValues(); return new ValueMatcher() { @Override public boolean matches() { - String[][] dims = holder.getKey().getDims(); + int[][] dims = holder.getKey().getDims(); if (dimIndex >= dims.length || dims[dimIndex] == null) { return predicate.apply(null); } - for (String dimVal : dims[dimIndex]) { - if (predicate.apply(dimVal)) { + for (int dimVal : dims[dimIndex]) { + if (predicate.apply(dimDim.getValue(dimVal))) { return true; } } @@ -704,24 +700,25 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter @Override public ValueMatcher makeValueMatcher(final String dimension, final Bound bound) { - Integer dimIndexObject = index.getDimensionIndex(dimension); - if (dimIndexObject == null) { + IncrementalIndex.DimensionDesc dimensionDesc = index.getDimension(dimension); + if (dimensionDesc == null) { return new BooleanValueMatcher(false); } - final int dimIndex = dimIndexObject; + final int dimIndex = dimensionDesc.getIndex(); + final IncrementalIndex.DimDim dimDim = dimensionDesc.getValues(); return new ValueMatcher() { @Override public boolean matches() { - String[][] dims = holder.getKey().getDims(); + int[][] dims = holder.getKey().getDims(); if (dimIndex >= dims.length || dims[dimIndex] == null) { return false; } - for (String dimVal : dims[dimIndex]) { - List stringCoords = Lists.newArrayList(SPLITTER.split(dimVal)); + for (int dimVal : dims[dimIndex]) { + List stringCoords = Lists.newArrayList(SPLITTER.split(dimDim.getValue(dimVal))); float[] coords = new float[stringCoords.size()]; for (int j = 0; j < coords.length; j++) { coords[j] = Float.valueOf(stringCoords.get(j)); diff --git a/processing/src/main/java/io/druid/segment/incremental/OffheapIncrementalIndex.java b/processing/src/main/java/io/druid/segment/incremental/OffheapIncrementalIndex.java deleted file mode 100644 index ad24713e923..00000000000 --- a/processing/src/main/java/io/druid/segment/incremental/OffheapIncrementalIndex.java +++ /dev/null @@ -1,486 +0,0 @@ -/* - * Licensed to Metamarkets Group Inc. (Metamarkets) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. Metamarkets licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package io.druid.segment.incremental; - -import com.google.common.base.Preconditions; -import com.google.common.base.Supplier; -import com.google.common.base.Throwables; -import com.metamx.common.ISE; -import io.druid.collections.ResourceHolder; -import io.druid.collections.StupidPool; -import io.druid.data.input.InputRow; -import io.druid.granularity.QueryGranularity; -import io.druid.query.aggregation.AggregatorFactory; -import io.druid.query.aggregation.BufferAggregator; -import org.mapdb.BTreeKeySerializer; -import org.mapdb.DB; -import org.mapdb.DBMaker; -import org.mapdb.Serializer; -import org.mapdb.Store; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import java.io.Serializable; -import java.lang.ref.WeakReference; -import java.lang.reflect.Field; -import java.nio.ByteBuffer; -import java.util.Arrays; -import java.util.Comparator; -import java.util.Map; -import java.util.UUID; -import java.util.WeakHashMap; -import java.util.concurrent.ConcurrentNavigableMap; -import java.util.concurrent.atomic.AtomicInteger; - -@Deprecated -/** - * This is not yet ready for production use and requires more work. - */ -public class OffheapIncrementalIndex extends IncrementalIndex -{ - private static final long STORE_CHUNK_SIZE; - - static - { - // MapDB allocated memory in chunks. We need to know CHUNK_SIZE - // in order to get a crude estimate of how much more direct memory - // might be used when adding an additional row. - try { - Field field = Store.class.getDeclaredField("CHUNK_SIZE"); - field.setAccessible(true); - STORE_CHUNK_SIZE = field.getLong(null); - } catch(NoSuchFieldException | IllegalAccessException e) { - throw new RuntimeException("Unable to determine MapDB store chunk size", e); - } - } - - private final ResourceHolder bufferHolder; - - private final DB db; - private final DB factsDb; - private final int[] aggPositionOffsets; - private final int totalAggSize; - private final ConcurrentNavigableMap facts; - private final int maxTotalBufferSize; - - private String outOfRowsReason = null; - - public OffheapIncrementalIndex( - IncrementalIndexSchema incrementalIndexSchema, - StupidPool bufferPool, - boolean deserializeComplexMetrics, - int maxTotalBufferSize - ) - { - super(incrementalIndexSchema, deserializeComplexMetrics); - - this.bufferHolder = bufferPool.take(); - Preconditions.checkArgument( - maxTotalBufferSize > bufferHolder.get().limit(), - "Maximum total buffer size must be greater than aggregation buffer size" - ); - - final AggregatorFactory[] metrics = incrementalIndexSchema.getMetrics(); - this.aggPositionOffsets = new int[metrics.length]; - - int currAggSize = 0; - for (int i = 0; i < metrics.length; i++) { - final AggregatorFactory agg = metrics[i]; - aggPositionOffsets[i] = currAggSize; - currAggSize += agg.getMaxIntermediateSize(); - } - this.totalAggSize = currAggSize; - - final DBMaker dbMaker = DBMaker.newMemoryDirectDB() - .transactionDisable() - .asyncWriteEnable() - .cacheLRUEnable() - .cacheSize(16384); - - this.factsDb = dbMaker.make(); - this.db = dbMaker.make(); - final TimeAndDimsSerializer timeAndDimsSerializer = new TimeAndDimsSerializer(this); - this.facts = factsDb.createTreeMap("__facts" + UUID.randomUUID()) - .keySerializer(timeAndDimsSerializer) - .comparator(timeAndDimsSerializer.getComparator()) - .valueSerializer(Serializer.INTEGER) - .make(); - this.maxTotalBufferSize = maxTotalBufferSize; - } - - public OffheapIncrementalIndex( - long minTimestamp, - QueryGranularity gran, - final AggregatorFactory[] metrics, - StupidPool bufferPool, - boolean deserializeComplexMetrics, - int maxTotalBufferSize - ) - { - this( - new IncrementalIndexSchema.Builder().withMinTimestamp(minTimestamp) - .withQueryGranularity(gran) - .withMetrics(metrics) - .build(), - bufferPool, - deserializeComplexMetrics, - maxTotalBufferSize - ); - } - - @Override - public ConcurrentNavigableMap getFacts() - { - return facts; - } - - @Override - protected DimDim makeDimDim(String dimension) - { - return new OffheapDimDim(dimension); - } - - @Override - protected BufferAggregator[] initAggs( - AggregatorFactory[] metrics, - Supplier rowSupplier, - boolean deserializeComplexMetrics - ) - { - BufferAggregator[] aggs = new BufferAggregator[metrics.length]; - for (int i = 0; i < metrics.length; i++) { - final AggregatorFactory agg = metrics[i]; - aggs[i] = agg.factorizeBuffered( - makeColumnSelectorFactory(agg, rowSupplier, deserializeComplexMetrics) - ); - } - return aggs; - } - - @Override - protected Integer addToFacts( - AggregatorFactory[] metrics, - boolean deserializeComplexMetrics, - InputRow row, - AtomicInteger numEntries, - TimeAndDims key, - ThreadLocal rowContainer, - Supplier rowSupplier - ) throws IndexSizeExceededException - { - final BufferAggregator[] aggs = getAggs(); - Integer rowOffset; - synchronized (this) { - if (!facts.containsKey(key)) { - if (!canAppendRow(false)) { - throw new IndexSizeExceededException("%s", getOutOfRowsReason()); - } - } - rowOffset = totalAggSize * numEntries.get(); - final Integer prev = facts.putIfAbsent(key, rowOffset); - if (prev != null) { - rowOffset = prev; - } else { - numEntries.incrementAndGet(); - for (int i = 0; i < aggs.length; i++) { - aggs[i].init(bufferHolder.get(), getMetricPosition(rowOffset, i)); - } - } - } - rowContainer.set(row); - for (int i = 0; i < aggs.length; i++) { - synchronized (aggs[i]) { - aggs[i].aggregate(bufferHolder.get(), getMetricPosition(rowOffset, i)); - } - } - rowContainer.set(null); - return numEntries.get(); - } - - public boolean canAppendRow() { - return canAppendRow(true); - } - - private boolean canAppendRow(boolean includeFudgeFactor) - { - // there is a race condition when checking current MapDB - // when canAppendRow() is called after adding a row it may return true, but on a subsequence call - // to addToFacts that may not be the case anymore because MapDB size may have changed. - // so we add this fudge factor, hoping that will be enough. - - final int aggBufferSize = bufferHolder.get().limit(); - if ((size() + 1) * totalAggSize > aggBufferSize) { - outOfRowsReason = String.format("Maximum aggregation buffer limit reached [%d bytes].", aggBufferSize); - return false; - } - // hopefully both MapDBs will grow by at most STORE_CHUNK_SIZE each when we add the next row. - if (getCurrentSize() + totalAggSize + 2 * STORE_CHUNK_SIZE + (includeFudgeFactor ? STORE_CHUNK_SIZE : 0) > maxTotalBufferSize) { - outOfRowsReason = String.format("Maximum time and dimension buffer limit reached [%d bytes].", maxTotalBufferSize - aggBufferSize); - return false; - } - return true; - } - - public String getOutOfRowsReason() { - return outOfRowsReason; - } - - @Override - protected BufferAggregator[] getAggsForRow(int rowOffset) - { - return getAggs(); - } - - @Override - protected Object getAggVal(BufferAggregator agg, int rowOffset, int aggPosition) - { - return agg.get(bufferHolder.get(), getMetricPosition(rowOffset, aggPosition)); - } - - @Override - public float getMetricFloatValue(int rowOffset, int aggOffset) - { - return getAggs()[aggOffset].getFloat(bufferHolder.get(), getMetricPosition(rowOffset, aggOffset)); - } - - @Override - public long getMetricLongValue(int rowOffset, int aggOffset) - { - return getAggs()[aggOffset].getLong(bufferHolder.get(), getMetricPosition(rowOffset, aggOffset)); - } - - @Override - public Object getMetricObjectValue(int rowOffset, int aggOffset) - { - return getAggs()[aggOffset].get(bufferHolder.get(), getMetricPosition(rowOffset, aggOffset)); - } - - @Override - public void close() - { - try { - bufferHolder.close(); - Store.forDB(db).close(); - Store.forDB(factsDb).close(); - } - catch (IOException e) { - throw Throwables.propagate(e); - } - } - - private int getMetricPosition(int rowOffset, int metricIndex) - { - return rowOffset + aggPositionOffsets[metricIndex]; - } - - private DimDim getDimDim(int dimIndex) - { - return getDimensionValues(getDimensionNames().get(dimIndex)); - } - - // MapDB forces serializers to implement serializable, which sucks - private static class TimeAndDimsSerializer extends BTreeKeySerializer implements Serializable - { - private final TimeAndDimsComparator comparator; - private final transient OffheapIncrementalIndex incrementalIndex; - - TimeAndDimsSerializer(OffheapIncrementalIndex incrementalIndex) - { - this.comparator = new TimeAndDimsComparator(); - this.incrementalIndex = incrementalIndex; - } - - @Override - public void serialize(DataOutput out, int start, int end, Object[] keys) throws IOException - { - for (int i = start; i < end; i++) { - TimeAndDims timeAndDim = (TimeAndDims) keys[i]; - out.writeLong(timeAndDim.getTimestamp()); - out.writeInt(timeAndDim.getDims().length); - int index = 0; - for (String[] dims : timeAndDim.getDims()) { - if (dims == null) { - out.write(-1); - } else { - DimDim dimDim = incrementalIndex.getDimDim(index); - out.writeInt(dims.length); - for (String value : dims) { - out.writeInt(dimDim.getId(value)); - } - } - index++; - } - } - } - - @Override - public Object[] deserialize(DataInput in, int start, int end, int size) throws IOException - { - Object[] ret = new Object[size]; - for (int i = start; i < end; i++) { - final long timeStamp = in.readLong(); - final String[][] dims = new String[in.readInt()][]; - for (int k = 0; k < dims.length; k++) { - int len = in.readInt(); - if (len != -1) { - DimDim dimDim = incrementalIndex.getDimDim(k); - String[] col = new String[len]; - for (int l = 0; l < col.length; l++) { - col[l] = dimDim.get(dimDim.getValue(in.readInt())); - } - dims[k] = col; - } - } - ret[i] = new TimeAndDims(timeStamp, dims); - } - return ret; - } - - @Override - public Comparator getComparator() - { - return comparator; - } - } - - private static class TimeAndDimsComparator implements Comparator, Serializable - { - @Override - public int compare(Object o1, Object o2) - { - return ((TimeAndDims) o1).compareTo((TimeAndDims) o2); - } - } - - private class OffheapDimDim implements DimDim - { - private final Map falseIds; - private final Map falseIdsReverse; - private final WeakHashMap> cache = new WeakHashMap(); - - private volatile String[] sortedVals = null; - // size on MapDB is slow so maintain a count here - private volatile int size = 0; - - public OffheapDimDim(String dimension) - { - falseIds = db.createHashMap(dimension) - .keySerializer(Serializer.STRING) - .valueSerializer(Serializer.INTEGER) - .make(); - falseIdsReverse = db.createHashMap(dimension + "_inverse") - .keySerializer(Serializer.INTEGER) - .valueSerializer(Serializer.STRING) - .make(); - } - - /** - * Returns the interned String value to allow fast comparisons using `==` instead of `.equals()` - * - * @see io.druid.segment.incremental.IncrementalIndexStorageAdapter.EntryHolderValueMatcherFactory#makeValueMatcher(String, String) - */ - public String get(String str) - { - final WeakReference cached = cache.get(str); - if (cached != null) { - final String value = cached.get(); - if (value != null) { - return value; - } - } - cache.put(str, new WeakReference(str)); - return str; - } - - public int getId(String value) - { - return falseIds.get(value); - } - - public String getValue(int id) - { - return falseIdsReverse.get(id); - } - - public boolean contains(String value) - { - return falseIds.containsKey(value); - } - - public int size() - { - return size; - } - - public synchronized int add(String value) - { - int id = size++; - falseIds.put(value, id); - falseIdsReverse.put(id, value); - return id; - } - - public int getSortedId(String value) - { - assertSorted(); - return Arrays.binarySearch(sortedVals, value); - } - - public String getSortedValue(int index) - { - assertSorted(); - return sortedVals[index]; - } - - public void sort() - { - if (sortedVals == null) { - sortedVals = new String[falseIds.size()]; - - int index = 0; - for (String value : falseIds.keySet()) { - sortedVals[index++] = value; - } - Arrays.sort(sortedVals); - } - } - - private void assertSorted() - { - if (sortedVals == null) { - throw new ISE("Call sort() before calling the getSorted* methods."); - } - } - - public boolean compareCanonicalValues(String s1, String s2) - { - return s1.equals(s2); - } - } - - private long getCurrentSize() - { - return Store.forDB(db).getCurrSize() + - Store.forDB(factsDb).getCurrSize() - // Size of aggregators - + size() * totalAggSize; - } -} diff --git a/processing/src/main/java/io/druid/segment/incremental/OnheapIncrementalIndex.java b/processing/src/main/java/io/druid/segment/incremental/OnheapIncrementalIndex.java index 455186e4e86..be1aeb2d5e4 100644 --- a/processing/src/main/java/io/druid/segment/incremental/OnheapIncrementalIndex.java +++ b/processing/src/main/java/io/druid/segment/incremental/OnheapIncrementalIndex.java @@ -20,10 +20,8 @@ package io.druid.segment.incremental; import com.google.common.base.Supplier; -import com.google.common.collect.BiMap; -import com.google.common.collect.HashBiMap; +import com.google.common.collect.Lists; import com.google.common.collect.Maps; -import com.metamx.common.ISE; import io.druid.data.input.InputRow; import io.druid.granularity.QueryGranularity; import io.druid.query.aggregation.Aggregator; @@ -35,7 +33,7 @@ import io.druid.segment.FloatColumnSelector; import io.druid.segment.LongColumnSelector; import io.druid.segment.ObjectColumnSelector; -import java.util.Arrays; +import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -48,7 +46,7 @@ import java.util.concurrent.atomic.AtomicInteger; public class OnheapIncrementalIndex extends IncrementalIndex { private final ConcurrentHashMap aggregators = new ConcurrentHashMap<>(); - private final ConcurrentNavigableMap facts = new ConcurrentSkipListMap<>(); + private final ConcurrentNavigableMap facts; private final AtomicInteger indexIncrement = new AtomicInteger(0); protected final int maxRowCount; private volatile Map selectors; @@ -63,6 +61,7 @@ public class OnheapIncrementalIndex extends IncrementalIndex { super(incrementalIndexSchema, deserializeComplexMetrics); this.maxRowCount = maxRowCount; + this.facts = new ConcurrentSkipListMap<>(dimsComparator()); } public OnheapIncrementalIndex( @@ -115,9 +114,9 @@ public class OnheapIncrementalIndex extends IncrementalIndex } @Override - protected DimDim makeDimDim(String dimension) + protected DimDim makeDimDim(String dimension, Object lock) { - return new OnHeapDimDim(); + return new OnHeapDimDim(lock); } @Override @@ -275,96 +274,116 @@ public class OnheapIncrementalIndex extends IncrementalIndex private static class OnHeapDimDim implements DimDim { - private final Map falseIds; - private final Map falseIdsReverse; - private volatile String[] sortedVals = null; - final ConcurrentMap poorMansInterning = Maps.newConcurrentMap(); + private final Map valueToId = Maps.newHashMap(); - public OnHeapDimDim() - { - BiMap biMap = Maps.synchronizedBiMap(HashBiMap.create()); - falseIds = biMap; - falseIdsReverse = biMap.inverse(); - } + private final List idToValue = Lists.newArrayList(); + private final Object lock; - /** - * Returns the interned String value to allow fast comparisons using `==` instead of `.equals()` - * - * @see io.druid.segment.incremental.IncrementalIndexStorageAdapter.EntryHolderValueMatcherFactory#makeValueMatcher(String, String) - */ - public String get(String str) + public OnHeapDimDim(Object lock) { - String prev = poorMansInterning.putIfAbsent(str, str); - return prev != null ? prev : str; + this.lock = lock; } public int getId(String value) { - final Integer id = falseIds.get(value); - return id == null ? -1 : id; + synchronized (lock) { + final Integer id = valueToId.get(value); + return id == null ? -1 : id; + } } public String getValue(int id) { - return falseIdsReverse.get(id); + synchronized (lock) { + return idToValue.get(id); + } } public boolean contains(String value) { - return falseIds.containsKey(value); + synchronized (lock) { + return valueToId.containsKey(value); + } } public int size() { - return falseIds.size(); + synchronized (lock) { + return valueToId.size(); + } } - public synchronized int add(String value) + public int add(String value) { - int id = falseIds.size(); - falseIds.put(value, id); - return id; - } - - public int getSortedId(String value) - { - assertSorted(); - return Arrays.binarySearch(sortedVals, value); - } - - public String getSortedValue(int index) - { - assertSorted(); - return sortedVals[index]; - } - - public void sort() - { - if (sortedVals == null) { - sortedVals = new String[falseIds.size()]; - - int index = 0; - for (String value : falseIds.keySet()) { - sortedVals[index++] = value; + synchronized (lock) { + Integer prev = valueToId.get(value); + if (prev != null) { + return prev; } - Arrays.sort(sortedVals); + final int index = size(); + valueToId.put(value, index); + idToValue.add(value); + return index; } } - private void assertSorted() + public OnHeapDimLookup sort() { - if (sortedVals == null) { - throw new ISE("Call sort() before calling the getSorted* methods."); + synchronized (lock) { + return new OnHeapDimLookup(idToValue, size()); } } - - public boolean compareCanonicalValues(String s1, String s2) - { - return s1 == s2; - } } - // Caches references to selector objetcs for each column instead of creating a new object each time in order to save heap space. + private static class OnHeapDimLookup implements SortedDimLookup + { + private final String[] sortedVals; + private final int[] idToIndex; + private final int[] indexToId; + + public OnHeapDimLookup(List idToValue, int length) + { + Map sortedMap = Maps.newTreeMap(); + for (int id = 0; id < length; id++) { + sortedMap.put(idToValue.get(id), id); + } + this.sortedVals = sortedMap.keySet().toArray(new String[length]); + this.idToIndex = new int[length]; + this.indexToId = new int[length]; + int index = 0; + for (Integer id : sortedMap.values()) { + idToIndex[id] = index; + indexToId[index] = id; + index++; + } + } + + @Override + public int size() + { + return sortedVals.length; + } + + @Override + public int indexToId(int index) + { + return indexToId[index]; + } + + @Override + public String getValue(int index) + { + return sortedVals[index]; + } + + @Override + public int idToIndex(int id) + { + return idToIndex[id]; + } + } + + // Caches references to selector objects for each column instead of creating a new object each time in order to save heap space. // In general the selectorFactory need not to thread-safe. // here its made thread safe to support the special case of groupBy where the multiple threads can add concurrently to the IncrementalIndex. private static class ObjectCachingColumnSelectorFactory implements ColumnSelectorFactory diff --git a/processing/src/test/java/io/druid/segment/IndexMergerTest.java b/processing/src/test/java/io/druid/segment/IndexMergerTest.java index ab0da2b32be..ab1dc5af47d 100644 --- a/processing/src/test/java/io/druid/segment/IndexMergerTest.java +++ b/processing/src/test/java/io/druid/segment/IndexMergerTest.java @@ -47,9 +47,9 @@ import io.druid.segment.data.RoaringBitmapSerdeFactory; import io.druid.segment.incremental.IncrementalIndex; import io.druid.segment.incremental.IncrementalIndexAdapter; import io.druid.segment.incremental.IncrementalIndexSchema; +import io.druid.segment.incremental.IndexSizeExceededException; import io.druid.segment.incremental.OnheapIncrementalIndex; import org.joda.time.DateTime; -import io.druid.segment.incremental.IndexSizeExceededException; import org.joda.time.Interval; import org.junit.Assert; import org.junit.Rule; @@ -219,12 +219,8 @@ public class IndexMergerTest Assert.assertEquals(3, index.getColumnNames().size()); assertDimCompression(index, indexSpec.getDimensionCompressionStrategy()); - final IndexableAdapter adapter = new QueryableIndexIndexableAdapter(index); - Iterable boats = adapter.getRows(); - List boatList = new ArrayList<>(); - for (Rowboat boat : boats) { - boatList.add(boat); - } + final QueryableIndexIndexableAdapter adapter = new QueryableIndexIndexableAdapter(index); + final List boatList = ImmutableList.copyOf(adapter.getRows()); Assert.assertEquals(2, boatList.size()); Assert.assertArrayEquals(new int[][]{{0}, {1}}, boatList.get(0).getDims()); @@ -846,12 +842,8 @@ public class IndexMergerTest ) ); - final IndexableAdapter adapter = new QueryableIndexIndexableAdapter(merged); - Iterable boats = adapter.getRows(); - List boatList = new ArrayList<>(); - for (Rowboat boat : boats) { - boatList.add(boat); - } + final QueryableIndexIndexableAdapter adapter = new QueryableIndexIndexableAdapter(merged); + final List boatList = ImmutableList.copyOf(adapter.getRows()); Assert.assertEquals(ImmutableList.of("d3", "d1", "d2"), ImmutableList.copyOf(adapter.getDimensionNames())); Assert.assertEquals(3, boatList.size()); @@ -945,12 +937,8 @@ public class IndexMergerTest ) ); - final IndexableAdapter adapter = new QueryableIndexIndexableAdapter(merged); - Iterable boats = adapter.getRows(); - List boatList = new ArrayList<>(); - for (Rowboat boat : boats) { - boatList.add(boat); - } + final QueryableIndexIndexableAdapter adapter = new QueryableIndexIndexableAdapter(merged); + final List boatList = ImmutableList.copyOf(adapter.getRows()); Assert.assertEquals(ImmutableList.of("dimA", "dimC"), ImmutableList.copyOf(adapter.getDimensionNames())); Assert.assertEquals(4, boatList.size()); @@ -1040,11 +1028,11 @@ public class IndexMergerTest ) ); - final IndexableAdapter adapter = new QueryableIndexIndexableAdapter(merged); - List boatList = ImmutableList.copyOf(adapter.getRows()); + final QueryableIndexIndexableAdapter adapter = new QueryableIndexIndexableAdapter(merged); + final List boatList = ImmutableList.copyOf(adapter.getRows()); - final IndexableAdapter adapter2 = new QueryableIndexIndexableAdapter(merged2); - List boatList2 = ImmutableList.copyOf(adapter2.getRows()); + final QueryableIndexIndexableAdapter adapter2 = new QueryableIndexIndexableAdapter(merged2); + final List boatList2 = ImmutableList.copyOf(adapter2.getRows()); Assert.assertEquals(ImmutableList.of("dimA", "dimB"), ImmutableList.copyOf(adapter.getDimensionNames())); Assert.assertEquals(5, boatList.size()); @@ -1187,12 +1175,8 @@ public class IndexMergerTest ) ); - final IndexableAdapter adapter = new QueryableIndexIndexableAdapter(merged); - Iterable boats = adapter.getRows(); - List boatList = new ArrayList<>(); - for (Rowboat boat : boats) { - boatList.add(boat); - } + final QueryableIndexIndexableAdapter adapter = new QueryableIndexIndexableAdapter(merged); + final List boatList = ImmutableList.copyOf(adapter.getRows()); Assert.assertEquals( ImmutableList.of("d2", "d3", "d5", "d6", "d7", "d8", "d9"), @@ -1234,14 +1218,12 @@ public class IndexMergerTest checkBitmapIndex(Lists.newArrayList(3), adapter.getBitmapIndex("d9", "921")); } - private void checkBitmapIndex(ArrayList expectIndex, IndexedInts index) + private void checkBitmapIndex(ArrayList expected, IndexedInts real) { - Assert.assertEquals(expectIndex.size(), index.size()); + Assert.assertEquals(expected.size(), real.size()); int i = 0; - Iterator it = index.iterator(); - while (it.hasNext()) { - Assert.assertEquals(expectIndex.get(i), it.next()); - i++; + for (Object index : real) { + Assert.assertEquals(expected.get(i++), index); } } @@ -1361,13 +1343,11 @@ public class IndexMergerTest ) ); - final IndexableAdapter adapter = new QueryableIndexIndexableAdapter(merged); - Iterable boats = adapter.getRows(); - List boatList = ImmutableList.copyOf(boats); + final QueryableIndexIndexableAdapter adapter = new QueryableIndexIndexableAdapter(merged); + final List boatList = ImmutableList.copyOf(adapter.getRows()); - final IndexableAdapter adapter2 = new QueryableIndexIndexableAdapter(merged2); - Iterable boats2 = adapter2.getRows(); - List boatList2 = ImmutableList.copyOf(boats2); + final QueryableIndexIndexableAdapter adapter2 = new QueryableIndexIndexableAdapter(merged2); + final List boatList2 = ImmutableList.copyOf(adapter2.getRows()); Assert.assertEquals(ImmutableList.of("dimB", "dimA"), ImmutableList.copyOf(adapter.getDimensionNames())); Assert.assertEquals(5, boatList.size()); diff --git a/processing/src/test/java/io/druid/segment/incremental/IncrementalIndexTest.java b/processing/src/test/java/io/druid/segment/incremental/IncrementalIndexTest.java index 28045de3eec..7d7e53dabdb 100644 --- a/processing/src/test/java/io/druid/segment/incremental/IncrementalIndexTest.java +++ b/processing/src/test/java/io/druid/segment/incremental/IncrementalIndexTest.java @@ -24,7 +24,6 @@ import com.google.common.collect.Lists; import com.metamx.common.ISE; import io.druid.data.input.MapBasedInputRow; import io.druid.granularity.QueryGranularity; -import io.druid.query.TestQueryRunners; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.CountAggregatorFactory; import org.joda.time.DateTime; @@ -71,23 +70,6 @@ public class IncrementalIndexTest } } - }, - { - new IndexCreator() - { - @Override - public IncrementalIndex createIndex() - { - return new OffheapIncrementalIndex( - 0, - QueryGranularity.MINUTE, - new AggregatorFactory[]{new CountAggregatorFactory("cnt")}, - TestQueryRunners.pool, - true, - 100 * 1024 * 1024 - ); - } - } } } diff --git a/processing/src/test/java/io/druid/segment/incremental/TimeAndDimsCompTest.java b/processing/src/test/java/io/druid/segment/incremental/TimeAndDimsCompTest.java new file mode 100644 index 00000000000..60da4f5d4c4 --- /dev/null +++ b/processing/src/test/java/io/druid/segment/incremental/TimeAndDimsCompTest.java @@ -0,0 +1,88 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.segment.incremental; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import io.druid.data.input.MapBasedInputRow; +import io.druid.granularity.QueryGranularity; +import io.druid.query.aggregation.AggregatorFactory; +import io.druid.query.aggregation.CountAggregatorFactory; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Arrays; +import java.util.Comparator; +import java.util.Map; + +import static io.druid.segment.incremental.IncrementalIndex.TimeAndDims; + +/** + */ +public class TimeAndDimsCompTest +{ + @Test + public void testBasic() throws IndexSizeExceededException + { + IncrementalIndex index = new OnheapIncrementalIndex( + 0, QueryGranularity.NONE, new AggregatorFactory[]{new CountAggregatorFactory("cnt")}, 1000 + ); + + long time = System.currentTimeMillis(); + TimeAndDims td1 = index.toTimeAndDims(toMapRow(time, "billy", "A", "joe", "B")); + TimeAndDims td2 = index.toTimeAndDims(toMapRow(time, "billy", "A", "joe", "A")); + TimeAndDims td3 = index.toTimeAndDims(toMapRow(time, "billy", "A")); + + TimeAndDims td4 = index.toTimeAndDims(toMapRow(time + 1, "billy", "A", "joe", "B")); + TimeAndDims td5 = index.toTimeAndDims(toMapRow(time + 1, "billy", "A", "joe", Arrays.asList("A", "B"))); + TimeAndDims td6 = index.toTimeAndDims(toMapRow(time + 1)); + + Comparator comparator = index.dimsComparator(); + + Assert.assertEquals(0, comparator.compare(td1, td1)); + Assert.assertEquals(0, comparator.compare(td2, td2)); + Assert.assertEquals(0, comparator.compare(td3, td3)); + + Assert.assertTrue(comparator.compare(td1, td2) > 0); + Assert.assertTrue(comparator.compare(td2, td1) < 0); + Assert.assertTrue(comparator.compare(td2, td3) > 0); + Assert.assertTrue(comparator.compare(td3, td2) < 0); + Assert.assertTrue(comparator.compare(td1, td3) > 0); + Assert.assertTrue(comparator.compare(td3, td1) < 0); + + Assert.assertTrue(comparator.compare(td6, td1) > 0); + Assert.assertTrue(comparator.compare(td6, td2) > 0); + Assert.assertTrue(comparator.compare(td6, td3) > 0); + + Assert.assertTrue(comparator.compare(td4, td6) > 0); + Assert.assertTrue(comparator.compare(td5, td6) > 0); + Assert.assertTrue(comparator.compare(td4, td5) < 0); + Assert.assertTrue(comparator.compare(td5, td4) > 0); + } + + private MapBasedInputRow toMapRow(long time, Object... dimAndVal) + { + Map data = Maps.newHashMap(); + for (int i = 0; i < dimAndVal.length; i += 2) { + data.put((String) dimAndVal[i], dimAndVal[i + 1]); + } + return new MapBasedInputRow(time, Lists.newArrayList(data.keySet()), data); + } +}