Remove boxing/unboxing in indexer (#4269)

* Remove boxing/unboxing in indexer

* Fix rowIndex visibility

* Cleanup
This commit is contained in:
Maksim Logvinenko 2017-05-18 03:13:53 +03:00 committed by Roman Leventov
parent daa8ef8658
commit d45dad2b44
11 changed files with 201 additions and 285 deletions

View File

@ -69,6 +69,9 @@ import io.druid.segment.incremental.IncrementalIndexAdapter;
import io.druid.segment.serde.ComplexMetricColumnSerializer; import io.druid.segment.serde.ComplexMetricColumnSerializer;
import io.druid.segment.serde.ComplexMetricSerde; import io.druid.segment.serde.ComplexMetricSerde;
import io.druid.segment.serde.ComplexMetrics; import io.druid.segment.serde.ComplexMetrics;
import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
import it.unimi.dsi.fastutil.ints.IntIterator;
import it.unimi.dsi.fastutil.ints.IntSortedSet;
import org.apache.commons.io.FileUtils; import org.apache.commons.io.FileUtils;
import org.joda.time.DateTime; import org.joda.time.DateTime;
import org.joda.time.Interval; import org.joda.time.Interval;
@ -90,7 +93,6 @@ import java.util.Map;
import java.util.NoSuchElementException; import java.util.NoSuchElementException;
import java.util.PriorityQueue; import java.util.PriorityQueue;
import java.util.Set; import java.util.Set;
import java.util.TreeSet;
/** /**
*/ */
@ -747,10 +749,14 @@ public class IndexMerger
mergers.get(i).processMergedRow(dims[i]); mergers.get(i).processMergedRow(dims[i]);
} }
for (Map.Entry<Integer, TreeSet<Integer>> comprisedRow : theRow.getComprisedRows().entrySet()) { Iterator<Int2ObjectMap.Entry<IntSortedSet>> rowsIterator = theRow.getComprisedRows().int2ObjectEntrySet().fastIterator();
final IntBuffer conversionBuffer = rowNumConversions.get(comprisedRow.getKey()); while (rowsIterator.hasNext()) {
Int2ObjectMap.Entry<IntSortedSet> comprisedRow = rowsIterator.next();
for (Integer rowNum : comprisedRow.getValue()) { final IntBuffer conversionBuffer = rowNumConversions.get(comprisedRow.getIntKey());
for (IntIterator setIterator = comprisedRow.getValue().iterator(); setIterator.hasNext(); /* NOP */) {
int rowNum = setIterator.nextInt();
while (conversionBuffer.position() < rowNum) { while (conversionBuffer.position() < rowNum) {
conversionBuffer.put(INVALID_ROW); conversionBuffer.put(INVALID_ROW);
} }
@ -1216,9 +1222,12 @@ public class IndexMerger
); );
for (Rowboat rowboat : Arrays.asList(lhs, rhs)) { for (Rowboat rowboat : Arrays.asList(lhs, rhs)) {
for (Map.Entry<Integer, TreeSet<Integer>> entry : rowboat.getComprisedRows().entrySet()) { Iterator<Int2ObjectMap.Entry<IntSortedSet>> entryIterator = rowboat.getComprisedRows().int2ObjectEntrySet().fastIterator();
for (Integer rowNum : entry.getValue()) { while (entryIterator.hasNext()) {
retVal.addRow(entry.getKey(), rowNum); Int2ObjectMap.Entry<IntSortedSet> entry = entryIterator.next();
for (IntIterator setIterator = entry.getValue().iterator(); setIterator.hasNext(); /* NOP */) {
retVal.addRow(entry.getIntKey(), setIterator.nextInt());
} }
} }
} }

View File

@ -53,6 +53,9 @@ import io.druid.segment.serde.ComplexMetricSerde;
import io.druid.segment.serde.ComplexMetrics; import io.druid.segment.serde.ComplexMetrics;
import io.druid.segment.serde.FloatGenericColumnPartSerde; import io.druid.segment.serde.FloatGenericColumnPartSerde;
import io.druid.segment.serde.LongGenericColumnPartSerde; import io.druid.segment.serde.LongGenericColumnPartSerde;
import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
import it.unimi.dsi.fastutil.ints.IntIterator;
import it.unimi.dsi.fastutil.ints.IntSortedSet;
import org.apache.commons.io.FileUtils; import org.apache.commons.io.FileUtils;
import org.joda.time.DateTime; import org.joda.time.DateTime;
import org.joda.time.Interval; import org.joda.time.Interval;
@ -64,10 +67,10 @@ import java.nio.ByteBuffer;
import java.nio.IntBuffer; import java.nio.IntBuffer;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.TreeSet;
public class IndexMergerV9 extends IndexMerger public class IndexMergerV9 extends IndexMerger
{ {
@ -452,16 +455,21 @@ public class IndexMergerV9 extends IndexMerger
merger.processMergedRow(dims[i]); merger.processMergedRow(dims[i]);
} }
for (Map.Entry<Integer, TreeSet<Integer>> comprisedRow : theRow.getComprisedRows().entrySet()) { Iterator<Int2ObjectMap.Entry<IntSortedSet>> rowsIterator = theRow.getComprisedRows().int2ObjectEntrySet().fastIterator();
final IntBuffer conversionBuffer = rowNumConversions.get(comprisedRow.getKey()); while (rowsIterator.hasNext()) {
Int2ObjectMap.Entry<IntSortedSet> comprisedRow = rowsIterator.next();
for (Integer rowNum : comprisedRow.getValue()) { final IntBuffer conversionBuffer = rowNumConversions.get(comprisedRow.getIntKey());
for (IntIterator setIterator = comprisedRow.getValue().iterator(); setIterator.hasNext(); /* NOP */) {
int rowNum = setIterator.nextInt();
while (conversionBuffer.position() < rowNum) { while (conversionBuffer.position() < rowNum) {
conversionBuffer.put(INVALID_ROW); conversionBuffer.put(INVALID_ROW);
} }
conversionBuffer.put(rowCount); conversionBuffer.put(rowCount);
} }
} }
if ((++rowCount % 500000) == 0) { if ((++rowCount % 500000) == 0) {
log.info("walked 500,000/%d rows in %,d millis.", rowCount, System.currentTimeMillis() - time); log.info("walked 500,000/%d rows in %,d millis.", rowCount, System.currentTimeMillis() - time);
time = System.currentTimeMillis(); time = System.currentTimeMillis();

View File

@ -19,15 +19,14 @@
package io.druid.segment; package io.druid.segment;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.primitives.Ints; import com.google.common.primitives.Ints;
import com.google.common.primitives.Longs; import com.google.common.primitives.Longs;
import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
import it.unimi.dsi.fastutil.ints.IntRBTreeSet;
import it.unimi.dsi.fastutil.ints.IntSortedSet;
import org.joda.time.DateTime; import org.joda.time.DateTime;
import java.util.Arrays; import java.util.Arrays;
import java.util.Map;
import java.util.TreeSet;
public class Rowboat implements Comparable<Rowboat> public class Rowboat implements Comparable<Rowboat>
{ {
@ -35,7 +34,7 @@ public class Rowboat implements Comparable<Rowboat>
private final Object[] dims; private final Object[] dims;
private final Object[] metrics; private final Object[] metrics;
private final int rowNum; private final int rowNum;
private final Map<Integer, TreeSet<Integer>> comprisedRows; private final Int2ObjectOpenHashMap<IntSortedSet> comprisedRows;
private final DimensionHandler[] handlers; private final DimensionHandler[] handlers;
public Rowboat( public Rowboat(
@ -52,7 +51,7 @@ public class Rowboat implements Comparable<Rowboat>
this.rowNum = rowNum; this.rowNum = rowNum;
this.handlers = handlers; this.handlers = handlers;
this.comprisedRows = Maps.newHashMap(); this.comprisedRows = new Int2ObjectOpenHashMap<>();
} }
public long getTimestamp() public long getTimestamp()
@ -72,15 +71,15 @@ public class Rowboat implements Comparable<Rowboat>
public void addRow(int indexNum, int rowNum) public void addRow(int indexNum, int rowNum)
{ {
TreeSet<Integer> rowNums = comprisedRows.get(indexNum); IntSortedSet rowNums = comprisedRows.get(indexNum);
if (rowNums == null) { if (rowNums == null) {
rowNums = Sets.newTreeSet(); rowNums = new IntRBTreeSet();
comprisedRows.put(indexNum, rowNums); comprisedRows.put(indexNum, rowNums);
} }
rowNums.add(rowNum); rowNums.add(rowNum);
} }
public Map<Integer, TreeSet<Integer>> getComprisedRows() public Int2ObjectOpenHashMap<IntSortedSet> getComprisedRows()
{ {
return comprisedRows; return comprisedRows;
} }

View File

@ -19,12 +19,12 @@
package io.druid.segment; package io.druid.segment;
import com.google.common.base.Function;
import com.google.common.base.Predicate; import com.google.common.base.Predicate;
import com.google.common.base.Predicates; import com.google.common.base.Predicates;
import com.google.common.base.Strings; import com.google.common.base.Strings;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import com.google.common.collect.Ordering;
import com.google.common.primitives.Ints; import com.google.common.primitives.Ints;
import io.druid.collections.bitmap.BitmapFactory; import io.druid.collections.bitmap.BitmapFactory;
import io.druid.collections.bitmap.MutableBitmap; import io.druid.collections.bitmap.MutableBitmap;
@ -43,6 +43,9 @@ import io.druid.segment.filter.BooleanValueMatcher;
import io.druid.segment.incremental.IncrementalIndex; import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.IncrementalIndexStorageAdapter; import io.druid.segment.incremental.IncrementalIndexStorageAdapter;
import it.unimi.dsi.fastutil.ints.IntArrays; import it.unimi.dsi.fastutil.ints.IntArrays;
import it.unimi.dsi.fastutil.ints.IntIterator;
import it.unimi.dsi.fastutil.objects.Object2IntRBTreeMap;
import it.unimi.dsi.fastutil.objects.Object2IntSortedMap;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import java.util.ArrayList; import java.util.ArrayList;
@ -52,38 +55,13 @@ import java.util.Comparator;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.function.Function;
public class StringDimensionIndexer implements DimensionIndexer<Integer, int[], String> public class StringDimensionIndexer implements DimensionIndexer<Integer, int[], String>
{ {
public static final Function<Object, String> STRING_TRANSFORMER = new Function<Object, String>() private static final Function<Object, String> STRING_TRANSFORMER = o -> o != null ? o.toString() : null;
{
@Override
public String apply(final Object o)
{
if (o == null) {
return null;
}
if (o instanceof String) {
return (String) o;
}
return o.toString();
}
};
public static final Comparator<String> UNENCODED_COMPARATOR = new Comparator<String>() private static final Comparator<String> UNENCODED_COMPARATOR = Ordering.natural().nullsFirst();
{
@Override
public int compare(String o1, String o2)
{
if (o1 == null) {
return o2 == null ? 0 : -1;
}
if (o2 == null) {
return 1;
}
return o1.compareTo(o2);
}
};
private static class DimensionDictionary private static class DimensionDictionary
{ {
@ -176,7 +154,7 @@ public class StringDimensionIndexer implements DimensionIndexer<Integer, int[],
public SortedDimensionDictionary(List<String> idToValue, int length) public SortedDimensionDictionary(List<String> idToValue, int length)
{ {
Map<String, Integer> sortedMap = Maps.newTreeMap(); Object2IntSortedMap<String> sortedMap = new Object2IntRBTreeMap<>();
for (int id = 0; id < length; id++) { for (int id = 0; id < length; id++) {
sortedMap.put(idToValue.get(id), id); sortedMap.put(idToValue.get(id), id);
} }
@ -184,7 +162,8 @@ public class StringDimensionIndexer implements DimensionIndexer<Integer, int[],
this.idToIndex = new int[length]; this.idToIndex = new int[length];
this.indexToId = new int[length]; this.indexToId = new int[length];
int index = 0; int index = 0;
for (Integer id : sortedMap.values()) { for (IntIterator iterator = sortedMap.values().iterator(); iterator.hasNext();) {
int id = iterator.nextInt();
idToIndex[id] = index; idToIndex[id] = index;
indexToId[index] = id; indexToId[index] = id;
index++; index++;

View File

@ -25,7 +25,6 @@ import com.google.common.base.Function;
import com.google.common.base.Supplier; import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Iterators; import com.google.common.collect.Iterators;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
@ -37,9 +36,9 @@ import io.druid.data.input.Row;
import io.druid.data.input.impl.DimensionSchema; import io.druid.data.input.impl.DimensionSchema;
import io.druid.data.input.impl.DimensionsSpec; import io.druid.data.input.impl.DimensionsSpec;
import io.druid.data.input.impl.SpatialDimensionSchema; import io.druid.data.input.impl.SpatialDimensionSchema;
import io.druid.java.util.common.granularity.Granularity;
import io.druid.java.util.common.IAE; import io.druid.java.util.common.IAE;
import io.druid.java.util.common.ISE; import io.druid.java.util.common.ISE;
import io.druid.java.util.common.granularity.Granularity;
import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.PostAggregator; import io.druid.query.aggregation.PostAggregator;
import io.druid.query.dimension.DimensionSpec; import io.druid.query.dimension.DimensionSpec;
@ -205,14 +204,7 @@ public abstract class IncrementalIndex<AggregatorType> implements Iterable<Row>,
// This is modified on add() in a critical section. // This is modified on add() in a critical section.
private final ThreadLocal<InputRow> in = new ThreadLocal<>(); private final ThreadLocal<InputRow> in = new ThreadLocal<>();
private final Supplier<InputRow> rowSupplier = new Supplier<InputRow>() private final Supplier<InputRow> rowSupplier = in::get;
{
@Override
public InputRow get()
{
return in.get();
}
};
/** /**
* Setting deserializeComplexMetrics to false is necessary for intermediate aggregation such as groupBy that * Setting deserializeComplexMetrics to false is necessary for intermediate aggregation such as groupBy that
@ -276,7 +268,7 @@ public abstract class IncrementalIndex<AggregatorType> implements Iterable<Row>,
columnCapabilities.put(dimName, capabilities); columnCapabilities.put(dimName, capabilities);
} }
//__time capabilites //__time capabilities
ColumnCapabilitiesImpl timeCapabilities = new ColumnCapabilitiesImpl(); ColumnCapabilitiesImpl timeCapabilities = new ColumnCapabilitiesImpl();
timeCapabilities.setType(ValueType.LONG); timeCapabilities.setType(ValueType.LONG);
columnCapabilities.put(Column.TIME_COLUMN_NAME, timeCapabilities); columnCapabilities.put(Column.TIME_COLUMN_NAME, timeCapabilities);
@ -688,13 +680,8 @@ public abstract class IncrementalIndex<AggregatorType> implements Iterable<Row>,
return Iterators.transform( return Iterators.transform(
getFacts().iterator(descending), getFacts().iterator(descending),
new Function<Map.Entry<TimeAndDims, Integer>, Row>() timeAndDims -> {
{ final int rowOffset = timeAndDims.getRowIndex();
@Override
public Row apply(final Map.Entry<TimeAndDims, Integer> input)
{
final TimeAndDims timeAndDims = input.getKey();
final int rowOffset = input.getValue();
Object[] theDims = timeAndDims.getDims(); Object[] theDims = timeAndDims.getDims();
@ -729,7 +716,6 @@ public abstract class IncrementalIndex<AggregatorType> implements Iterable<Row>,
return new MapBasedRow(timeAndDims.getTimestamp(), theVals); return new MapBasedRow(timeAndDims.getTimestamp(), theVals);
} }
}
); );
} }
}; };
@ -832,19 +818,40 @@ public abstract class IncrementalIndex<AggregatorType> implements Iterable<Row>,
public static final class TimeAndDims public static final class TimeAndDims
{ {
public static final int EMPTY_ROW_INDEX = -1;
private final long timestamp; private final long timestamp;
private final Object[] dims; private final Object[] dims;
private final List<DimensionDesc> dimensionDescsList; private final List<DimensionDesc> dimensionDescsList;
/**
* rowIndex is not checked in {@link #equals} and {@link #hashCode} on purpose. TimeAndDims acts as a Map key
* and "entry" object (rowIndex is the "value") at the same time. This is done to reduce object indirection and
* improve locality, and avoid boxing of rowIndex as Integer, when stored in JDK collection:
* {@link RollupFactsHolder} needs concurrent collections, that are not present in fastutil.
*/
private int rowIndex;
TimeAndDims( TimeAndDims(
long timestamp, long timestamp,
Object[] dims, Object[] dims,
List<DimensionDesc> dimensionDescsList List<DimensionDesc> dimensionDescsList
) )
{
this(timestamp, dims, dimensionDescsList, EMPTY_ROW_INDEX);
}
TimeAndDims(
long timestamp,
Object[] dims,
List<DimensionDesc> dimensionDescsList,
int rowIndex
)
{ {
this.timestamp = timestamp; this.timestamp = timestamp;
this.dims = dims; this.dims = dims;
this.dimensionDescsList = dimensionDescsList; this.dimensionDescsList = dimensionDescsList;
this.rowIndex = rowIndex;
} }
public long getTimestamp() public long getTimestamp()
@ -857,6 +864,16 @@ public abstract class IncrementalIndex<AggregatorType> implements Iterable<Row>,
return dims; return dims;
} }
public int getRowIndex()
{
return rowIndex;
}
private void setRowIndex(int rowIndex)
{
this.rowIndex = rowIndex;
}
@Override @Override
public String toString() public String toString()
{ {
@ -991,87 +1008,29 @@ public abstract class IncrementalIndex<AggregatorType> implements Iterable<Row>,
return true; return true;
} }
public static class FactsEntry implements Map.Entry<TimeAndDims, Integer>
{
TimeAndDims key = null;
Integer value = null;
public FactsEntry(TimeAndDims key, Integer value)
{
this.key = key;
this.value = value;
}
@Override
public TimeAndDims getKey()
{
return key;
}
@Override
public Integer getValue()
{
return value;
}
@Override
public Integer setValue(Integer value)
{
return value;
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
FactsEntry that = (FactsEntry) o;
if (key != null ? !key.equals(that.key) : that.key != null) {
return false;
}
return value != null ? value.equals(that.value) : that.value == null;
}
@Override
public int hashCode()
{
int result = key != null ? key.hashCode() : 0;
result = 31 * result + (value != null ? value.hashCode() : 0);
return result;
}
}
interface FactsHolder interface FactsHolder
{ {
/** /**
* @return the previous value associated with the specified key, or * @return the previous rowIndex associated with the specified key, or
* {@code null} if there was no mapping for the key. * {@code TimeAndDims#EMPTY_ROW_INDEX} if there was no mapping for the key.
*/ */
Integer getPriorIndex(TimeAndDims key); int getPriorIndex(TimeAndDims key);
long getMinTimeMillis(); long getMinTimeMillis();
long getMaxTimeMillis(); long getMaxTimeMillis();
Iterable<Map.Entry<TimeAndDims, Integer>> entrySet(); Iterator<TimeAndDims> iterator(boolean descending);
Iterator<Map.Entry<TimeAndDims, Integer>> iterator(boolean descending); Iterable<TimeAndDims> timeRangeIterable(boolean descending, long timeStart, long timeEnd);
Iterable<Map.Entry<TimeAndDims, Integer>> timeRangeIterable(boolean descending, long timeStart, long timeEnd);
Iterable<TimeAndDims> keySet(); Iterable<TimeAndDims> keySet();
/** /**
* @return the previous value associated with the specified key, or * @return the previous rowIndex associated with the specified key, or
* {@code null} if there was no mapping for the key. * {@code TimeAndDims#EMPTY_ROW_INDEX} if there was no mapping for the key.
*/ */
Integer putIfAbsent(TimeAndDims key, Integer rowIndex); int putIfAbsent(TimeAndDims key, int rowIndex);
void clear(); void clear();
} }
@ -1079,7 +1038,8 @@ public abstract class IncrementalIndex<AggregatorType> implements Iterable<Row>,
static class RollupFactsHolder implements FactsHolder static class RollupFactsHolder implements FactsHolder
{ {
private final boolean sortFacts; private final boolean sortFacts;
private final ConcurrentMap<TimeAndDims, Integer> facts; // Can't use Set because we need to be able to get from collection
private final ConcurrentMap<TimeAndDims, TimeAndDims> facts;
private final List<DimensionDesc> dimensionDescsList; private final List<DimensionDesc> dimensionDescsList;
public RollupFactsHolder(boolean sortFacts, Comparator<TimeAndDims> timeAndDimsComparator, List<DimensionDesc> dimensionDescsList) public RollupFactsHolder(boolean sortFacts, Comparator<TimeAndDims> timeAndDimsComparator, List<DimensionDesc> dimensionDescsList)
@ -1094,16 +1054,17 @@ public abstract class IncrementalIndex<AggregatorType> implements Iterable<Row>,
} }
@Override @Override
public Integer getPriorIndex(TimeAndDims key) public int getPriorIndex(TimeAndDims key)
{ {
return facts.get(key); TimeAndDims timeAndDims = facts.get(key);
return timeAndDims == null ? TimeAndDims.EMPTY_ROW_INDEX : timeAndDims.rowIndex;
} }
@Override @Override
public long getMinTimeMillis() public long getMinTimeMillis()
{ {
if (sortFacts) { if (sortFacts) {
return ((ConcurrentNavigableMap<TimeAndDims, Integer>) facts).firstKey().getTimestamp(); return ((ConcurrentNavigableMap<TimeAndDims, TimeAndDims>) facts).firstKey().getTimestamp();
} else { } else {
throw new UnsupportedOperationException("can't get minTime from unsorted facts data."); throw new UnsupportedOperationException("can't get minTime from unsorted facts data.");
} }
@ -1113,39 +1074,33 @@ public abstract class IncrementalIndex<AggregatorType> implements Iterable<Row>,
public long getMaxTimeMillis() public long getMaxTimeMillis()
{ {
if (sortFacts) { if (sortFacts) {
return ((ConcurrentNavigableMap<TimeAndDims, Integer>) facts).lastKey().getTimestamp(); return ((ConcurrentNavigableMap<TimeAndDims, TimeAndDims>) facts).lastKey().getTimestamp();
} else { } else {
throw new UnsupportedOperationException("can't get maxTime from unsorted facts data."); throw new UnsupportedOperationException("can't get maxTime from unsorted facts data.");
} }
} }
@Override @Override
public Iterable<Map.Entry<TimeAndDims, Integer>> entrySet() public Iterator<TimeAndDims> iterator(boolean descending)
{
return facts.entrySet();
}
@Override
public Iterator<Map.Entry<TimeAndDims, Integer>> iterator(boolean descending)
{ {
if (descending && sortFacts) { if (descending && sortFacts) {
return ((ConcurrentNavigableMap<TimeAndDims, Integer>) facts).descendingMap().entrySet().iterator(); return ((ConcurrentNavigableMap<TimeAndDims, TimeAndDims>) facts).descendingMap().keySet().iterator();
} }
return entrySet().iterator(); return keySet().iterator();
} }
@Override @Override
public Iterable<Map.Entry<TimeAndDims, Integer>> timeRangeIterable(boolean descending, long timeStart, long timeEnd) public Iterable<TimeAndDims> timeRangeIterable(boolean descending, long timeStart, long timeEnd)
{ {
if (!sortFacts) { if (!sortFacts) {
throw new UnsupportedOperationException("can't get timeRange from unsorted facts data."); throw new UnsupportedOperationException("can't get timeRange from unsorted facts data.");
} }
TimeAndDims start = new TimeAndDims(timeStart, new Object[]{}, dimensionDescsList); TimeAndDims start = new TimeAndDims(timeStart, new Object[]{}, dimensionDescsList);
TimeAndDims end = new TimeAndDims(timeEnd, new Object[]{}, dimensionDescsList); TimeAndDims end = new TimeAndDims(timeEnd, new Object[]{}, dimensionDescsList);
ConcurrentNavigableMap<TimeAndDims, Integer> subMap = ConcurrentNavigableMap<TimeAndDims, TimeAndDims> subMap =
((ConcurrentNavigableMap<TimeAndDims, Integer>) facts).subMap(start, end); ((ConcurrentNavigableMap<TimeAndDims, TimeAndDims>) facts).subMap(start, end);
final Map<TimeAndDims, Integer> rangeMap = descending ? subMap.descendingMap() : subMap; final Map<TimeAndDims, TimeAndDims> rangeMap = descending ? subMap.descendingMap() : subMap;
return rangeMap.entrySet(); return rangeMap.keySet();
} }
@Override @Override
@ -1155,9 +1110,12 @@ public abstract class IncrementalIndex<AggregatorType> implements Iterable<Row>,
} }
@Override @Override
public Integer putIfAbsent(TimeAndDims key, Integer rowIndex) public int putIfAbsent(TimeAndDims key, int rowIndex)
{ {
return facts.putIfAbsent(key, rowIndex); // setRowIndex() must be called before facts.putIfAbsent() for visibility of rowIndex from concurrent readers.
key.setRowIndex(rowIndex);
TimeAndDims prev = facts.putIfAbsent(key, key);
return prev == null ? TimeAndDims.EMPTY_ROW_INDEX : prev.rowIndex;
} }
@Override @Override
@ -1170,37 +1128,30 @@ public abstract class IncrementalIndex<AggregatorType> implements Iterable<Row>,
static class PlainFactsHolder implements FactsHolder static class PlainFactsHolder implements FactsHolder
{ {
private final boolean sortFacts; private final boolean sortFacts;
private final ConcurrentMap<Long, Deque<Map.Entry<TimeAndDims, Integer>>> facts; private final ConcurrentMap<Long, Deque<TimeAndDims>> facts;
public PlainFactsHolder(boolean sortFacts) public PlainFactsHolder(boolean sortFacts)
{ {
this.sortFacts = sortFacts; this.sortFacts = sortFacts;
if (sortFacts) { if (sortFacts) {
this.facts = new ConcurrentSkipListMap<>(new Comparator<Long>() this.facts = new ConcurrentSkipListMap<>();
{
@Override
public int compare(Long lhs, Long rhs)
{
return Longs.compare(lhs, rhs);
}
});
} else { } else {
this.facts = new ConcurrentHashMap<>(); this.facts = new ConcurrentHashMap<>();
} }
} }
@Override @Override
public Integer getPriorIndex(TimeAndDims key) public int getPriorIndex(TimeAndDims key)
{ {
// always return null to indicate that no prior key cause we always add new row // always return EMPTY_ROW_INDEX to indicate that no prior key cause we always add new row
return null; return TimeAndDims.EMPTY_ROW_INDEX;
} }
@Override @Override
public long getMinTimeMillis() public long getMinTimeMillis()
{ {
if (sortFacts) { if (sortFacts) {
return ((ConcurrentNavigableMap<Long, Deque<Map.Entry<TimeAndDims, Integer>>>) facts).firstKey(); return ((ConcurrentNavigableMap<Long, Deque<TimeAndDims>>) facts).firstKey();
} else { } else {
throw new UnsupportedOperationException("can't get minTime from unsorted facts data."); throw new UnsupportedOperationException("can't get minTime from unsorted facts data.");
} }
@ -1210,93 +1161,65 @@ public abstract class IncrementalIndex<AggregatorType> implements Iterable<Row>,
public long getMaxTimeMillis() public long getMaxTimeMillis()
{ {
if (sortFacts) { if (sortFacts) {
return ((ConcurrentNavigableMap<Long, Deque<Map.Entry<TimeAndDims, Integer>>>) facts).lastKey(); return ((ConcurrentNavigableMap<Long, Deque<TimeAndDims>>) facts).lastKey();
} else { } else {
throw new UnsupportedOperationException("can't get maxTime from unsorted facts data."); throw new UnsupportedOperationException("can't get maxTime from unsorted facts data.");
} }
} }
@Override @Override
public Iterable<Map.Entry<TimeAndDims, Integer>> entrySet() public Iterator<TimeAndDims> iterator(boolean descending)
{
return concat(facts.values(), false);
}
@Override
public Iterator<Map.Entry<TimeAndDims, Integer>> iterator(boolean descending)
{ {
if (descending && sortFacts) { if (descending && sortFacts) {
return concat(((ConcurrentNavigableMap<Long, Deque<Map.Entry<TimeAndDims, Integer>>>) facts) return concat(((ConcurrentNavigableMap<Long, Deque<TimeAndDims>>) facts)
.descendingMap().values(), true).iterator(); .descendingMap().values(), true).iterator();
} }
return concat(facts.values(), false).iterator(); return concat(facts.values(), false).iterator();
} }
@Override @Override
public Iterable<Map.Entry<TimeAndDims, Integer>> timeRangeIterable(boolean descending, long timeStart, long timeEnd) public Iterable<TimeAndDims> timeRangeIterable(boolean descending, long timeStart, long timeEnd)
{ {
ConcurrentNavigableMap<Long, Deque<Map.Entry<TimeAndDims, Integer>>> subMap = ConcurrentNavigableMap<Long, Deque<TimeAndDims>> subMap =
((ConcurrentNavigableMap<Long, Deque<Map.Entry<TimeAndDims, Integer>>>) facts).subMap(timeStart, timeEnd); ((ConcurrentNavigableMap<Long, Deque<TimeAndDims>>) facts).subMap(timeStart, timeEnd);
final Map<Long, Deque<Map.Entry<TimeAndDims, Integer>>> rangeMap = descending ? subMap.descendingMap() : subMap; final Map<Long, Deque<TimeAndDims>> rangeMap = descending ? subMap.descendingMap() : subMap;
return concat(rangeMap.values(), descending); return concat(rangeMap.values(), descending);
} }
private Iterable<Map.Entry<TimeAndDims, Integer>> concat( private Iterable<TimeAndDims> concat(
final Iterable<Deque<Map.Entry<TimeAndDims, Integer>>> iterable, final Iterable<Deque<TimeAndDims>> iterable,
final boolean descending final boolean descending
) )
{ {
return new Iterable<Map.Entry<TimeAndDims, Integer>>() return () -> Iterators.concat(
{
@Override
public Iterator<Map.Entry<TimeAndDims, Integer>> iterator()
{
return Iterators.concat(
Iterators.transform( Iterators.transform(
iterable.iterator(), iterable.iterator(),
new Function<Deque<Map.Entry<TimeAndDims, Integer>>, Iterator<Map.Entry<TimeAndDims, Integer>>>() input -> descending ? input.descendingIterator() : input.iterator()
{
@Override
public Iterator<Map.Entry<TimeAndDims, Integer>> apply(Deque<Map.Entry<TimeAndDims, Integer>> input)
{
return descending ? input.descendingIterator() : input.iterator();
}
}
) )
); );
} }
};
}
@Override @Override
public Iterable<TimeAndDims> keySet() public Iterable<TimeAndDims> keySet()
{ {
return Iterables.transform( return concat(facts.values(), false);
entrySet(),
new Function<Map.Entry<TimeAndDims, Integer>, TimeAndDims>()
{
@Override
public TimeAndDims apply(Map.Entry<TimeAndDims, Integer> input)
{
return input.getKey();
}
}
);
} }
@Override @Override
public Integer putIfAbsent(TimeAndDims key, Integer rowIndex) public int putIfAbsent(TimeAndDims key, int rowIndex)
{ {
Long time = key.getTimestamp(); Long time = key.getTimestamp();
Deque<Map.Entry<TimeAndDims, Integer>> rows = facts.get(time); Deque<TimeAndDims> rows = facts.get(time);
if (rows == null) { if (rows == null) {
facts.putIfAbsent(time, new ConcurrentLinkedDeque<Map.Entry<TimeAndDims, Integer>>()); facts.putIfAbsent(time, new ConcurrentLinkedDeque<>());
// in race condition, rows may be put by other thread, so always get latest status from facts // in race condition, rows may be put by other thread, so always get latest status from facts
rows = facts.get(time); rows = facts.get(time);
} }
rows.add(new FactsEntry(key, rowIndex)); // setRowIndex() must be called before rows.add() for visibility of rowIndex from concurrent readers.
// always return null to indicate that we always add new row key.setRowIndex(rowIndex);
return null; rows.add(key);
// always return EMPTY_ROW_INDEX to indicate that we always add new row
return TimeAndDims.EMPTY_ROW_INDEX;
} }
@Override @Override

View File

@ -179,17 +179,16 @@ public class IncrementalIndexAdapter implements IndexableAdapter
* iterator() call to ensure the counter starts at 0. * iterator() call to ensure the counter starts at 0.
*/ */
return Iterators.transform( return Iterators.transform(
index.getFacts().entrySet().iterator(), index.getFacts().keySet().iterator(),
new Function<Map.Entry<IncrementalIndex.TimeAndDims, Integer>, Rowboat>() new Function<IncrementalIndex.TimeAndDims, Rowboat>()
{ {
int count = 0; int count = 0;
@Override @Override
public Rowboat apply(Map.Entry<IncrementalIndex.TimeAndDims, Integer> input) public Rowboat apply(IncrementalIndex.TimeAndDims timeAndDims)
{ {
final IncrementalIndex.TimeAndDims timeAndDims = input.getKey();
final Object[] dimValues = timeAndDims.getDims(); final Object[] dimValues = timeAndDims.getDims();
final int rowOffset = input.getValue(); final int rowOffset = timeAndDims.getRowIndex();
Object[] dims = new Object[dimValues.length]; Object[] dims = new Object[dimValues.length];
for (IncrementalIndex.DimensionDesc dimension : dimensions) { for (IncrementalIndex.DimensionDesc dimension : dimensions) {

View File

@ -241,8 +241,8 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter
{ {
private final ValueMatcher filterMatcher = makeFilterMatcher(filter, this); private final ValueMatcher filterMatcher = makeFilterMatcher(filter, this);
private final int maxRowIndex; private final int maxRowIndex;
private Iterator<Map.Entry<IncrementalIndex.TimeAndDims, Integer>> baseIter; private Iterator<IncrementalIndex.TimeAndDims> baseIter;
private Iterable<Map.Entry<IncrementalIndex.TimeAndDims, Integer>> cursorIterable; private Iterable<IncrementalIndex.TimeAndDims> cursorIterable;
private boolean emptyRange; private boolean emptyRange;
final DateTime time; final DateTime time;
int numAdvanced = -1; int numAdvanced = -1;
@ -278,8 +278,8 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter
while (baseIter.hasNext()) { while (baseIter.hasNext()) {
BaseQuery.checkInterrupted(); BaseQuery.checkInterrupted();
Map.Entry<IncrementalIndex.TimeAndDims, Integer> entry = baseIter.next(); IncrementalIndex.TimeAndDims entry = baseIter.next();
if (beyondMaxRowIndex(entry.getValue())) { if (beyondMaxRowIndex(entry.getRowIndex())) {
continue; continue;
} }
@ -306,8 +306,8 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter
return; return;
} }
Map.Entry<IncrementalIndex.TimeAndDims, Integer> entry = baseIter.next(); IncrementalIndex.TimeAndDims entry = baseIter.next();
if (beyondMaxRowIndex(entry.getValue())) { if (beyondMaxRowIndex(entry.getRowIndex())) {
continue; continue;
} }
@ -358,8 +358,8 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter
boolean foundMatched = false; boolean foundMatched = false;
while (baseIter.hasNext()) { while (baseIter.hasNext()) {
Map.Entry<IncrementalIndex.TimeAndDims, Integer> entry = baseIter.next(); IncrementalIndex.TimeAndDims entry = baseIter.next();
if (beyondMaxRowIndex(entry.getValue())) { if (beyondMaxRowIndex(entry.getRowIndex())) {
numAdvanced++; numAdvanced++;
continue; continue;
} }
@ -642,26 +642,26 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter
public static class EntryHolder public static class EntryHolder
{ {
Map.Entry<IncrementalIndex.TimeAndDims, Integer> currEntry = null; IncrementalIndex.TimeAndDims currEntry = null;
public Map.Entry<IncrementalIndex.TimeAndDims, Integer> get() public IncrementalIndex.TimeAndDims get()
{ {
return currEntry; return currEntry;
} }
public void set(Map.Entry<IncrementalIndex.TimeAndDims, Integer> currEntry) public void set(IncrementalIndex.TimeAndDims currEntry)
{ {
this.currEntry = currEntry; this.currEntry = currEntry;
} }
public IncrementalIndex.TimeAndDims getKey() public IncrementalIndex.TimeAndDims getKey()
{ {
return currEntry.getKey(); return currEntry;
} }
public Integer getValue() public int getValue()
{ {
return currEntry.getValue(); return currEntry.getRowIndex();
} }
} }

View File

@ -189,8 +189,8 @@ public class OffheapIncrementalIndex extends IncrementalIndex<BufferAggregator>
int bufferOffset; int bufferOffset;
synchronized (this) { synchronized (this) {
final Integer priorIndex = facts.getPriorIndex(key); final int priorIndex = facts.getPriorIndex(key);
if (null != priorIndex) { if (TimeAndDims.EMPTY_ROW_INDEX != priorIndex) {
final int[] indexAndOffset = indexAndOffsets.get(priorIndex); final int[] indexAndOffset = indexAndOffsets.get(priorIndex);
bufferIndex = indexAndOffset[0]; bufferIndex = indexAndOffset[0];
bufferOffset = indexAndOffset[1]; bufferOffset = indexAndOffset[1];
@ -236,7 +236,7 @@ public class OffheapIncrementalIndex extends IncrementalIndex<BufferAggregator>
} }
// Last ditch sanity checks // Last ditch sanity checks
if (numEntries.get() >= maxRowCount && facts.getPriorIndex(key) == null) { if (numEntries.get() >= maxRowCount && facts.getPriorIndex(key) == TimeAndDims.EMPTY_ROW_INDEX) {
throw new IndexSizeExceededException("Maximum number of rows [%d] reached", maxRowCount); throw new IndexSizeExceededException("Maximum number of rows [%d] reached", maxRowCount);
} }
@ -245,8 +245,8 @@ public class OffheapIncrementalIndex extends IncrementalIndex<BufferAggregator>
// note that indexAndOffsets must be updated before facts, because as soon as we update facts // note that indexAndOffsets must be updated before facts, because as soon as we update facts
// concurrent readers get hold of it and might ask for newly added row // concurrent readers get hold of it and might ask for newly added row
indexAndOffsets.add(new int[]{bufferIndex, bufferOffset}); indexAndOffsets.add(new int[]{bufferIndex, bufferOffset});
final Integer prev = facts.putIfAbsent(key, rowIndex); final int prev = facts.putIfAbsent(key, rowIndex);
if (null == prev) { if (TimeAndDims.EMPTY_ROW_INDEX == prev) {
numEntries.incrementAndGet(); numEntries.incrementAndGet();
} else { } else {
throw new ISE("WTF! we are in sychronized block."); throw new ISE("WTF! we are in sychronized block.");

View File

@ -180,11 +180,11 @@ public class OnheapIncrementalIndex extends IncrementalIndex<Aggregator>
Supplier<InputRow> rowSupplier Supplier<InputRow> rowSupplier
) throws IndexSizeExceededException ) throws IndexSizeExceededException
{ {
final Integer priorIndex = facts.getPriorIndex(key); final int priorIndex = facts.getPriorIndex(key);
Aggregator[] aggs; Aggregator[] aggs;
if (null != priorIndex) { if (TimeAndDims.EMPTY_ROW_INDEX != priorIndex) {
aggs = concurrentGet(priorIndex); aggs = concurrentGet(priorIndex);
doAggregate(metrics, aggs, rowContainer, row, reportParseExceptions); doAggregate(metrics, aggs, rowContainer, row, reportParseExceptions);
} else { } else {
@ -196,11 +196,11 @@ public class OnheapIncrementalIndex extends IncrementalIndex<Aggregator>
concurrentSet(rowIndex, aggs); concurrentSet(rowIndex, aggs);
// Last ditch sanity checks // Last ditch sanity checks
if (numEntries.get() >= maxRowCount && facts.getPriorIndex(key) == null) { if (numEntries.get() >= maxRowCount && facts.getPriorIndex(key) == TimeAndDims.EMPTY_ROW_INDEX) {
throw new IndexSizeExceededException("Maximum number of rows [%d] reached", maxRowCount); throw new IndexSizeExceededException("Maximum number of rows [%d] reached", maxRowCount);
} }
final Integer prev = facts.putIfAbsent(key, rowIndex); final int prev = facts.putIfAbsent(key, rowIndex);
if (null == prev) { if (TimeAndDims.EMPTY_ROW_INDEX == prev) {
numEntries.incrementAndGet(); numEntries.incrementAndGet();
} else { } else {
// We lost a race // We lost a race

View File

@ -169,11 +169,11 @@ public class OnheapIncrementalIndexBenchmark extends AbstractBenchmark
// Last ditch sanity checks // Last ditch sanity checks
if (numEntries.get() >= maxRowCount && getFacts().getPriorIndex(key) == null) { if (numEntries.get() >= maxRowCount && getFacts().getPriorIndex(key) == TimeAndDims.EMPTY_ROW_INDEX) {
throw new IndexSizeExceededException("Maximum number of rows reached"); throw new IndexSizeExceededException("Maximum number of rows reached");
} }
final Integer prev = getFacts().putIfAbsent(key, rowIndex); final int prev = getFacts().putIfAbsent(key, rowIndex);
if (null == prev) { if (TimeAndDims.EMPTY_ROW_INDEX == prev) {
numEntries.incrementAndGet(); numEntries.incrementAndGet();
} else { } else {
// We lost a race // We lost a race

View File

@ -31,7 +31,6 @@ import org.easymock.EasyMock;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import java.util.Map;
import java.util.Random; import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
@ -83,8 +82,8 @@ public class OnheapIncrementalIndexTest
public void run() public void run()
{ {
while (!Thread.interrupted()) { while (!Thread.interrupted()) {
for (Map.Entry<IncrementalIndex.TimeAndDims, Integer> row : index.getFacts().entrySet()) { for (IncrementalIndex.TimeAndDims row : index.getFacts().keySet()) {
if (index.getMetricLongValue(row.getValue(), 0) != 1) { if (index.getMetricLongValue(row.getRowIndex(), 0) != 1) {
checkFailedCount.addAndGet(1); checkFailedCount.addAndGet(1);
} }
} }