TimeAndDims optimise to store indexes

This commit is contained in:
nishantmonu51 2014-09-11 16:13:37 +05:30
parent 2e3be39048
commit c39eaf870b
3 changed files with 170 additions and 115 deletions

View File

@ -64,11 +64,11 @@ import java.io.Serializable;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.CopyOnWriteArrayList;
@ -314,8 +314,11 @@ public class IncrementalIndex implements Iterable<Row>, Closeable
}
this.bufferHolder = bufferPool.take();
this.dimValues = new DimensionHolder();
db = DBMaker.newMemoryDirectDB().transactionDisable().cacheWeakRefEnable().make();
this.facts = db.createTreeMap("__facts" + UUID.randomUUID()).make();
db = DBMaker.newMemoryDirectDB().transactionDisable().asyncWriteEnable().cacheSoftRefEnable().make();
this.facts = db.createTreeMap("__facts" + UUID.randomUUID()).comparator(
new TimeAndDimsComparator(this)
).make();
}
public IncrementalIndex(
@ -394,10 +397,10 @@ public class IncrementalIndex implements Iterable<Row>, Closeable
final List<String> rowDimensions = row.getDimensions();
String[][] dims;
List<String[]> overflow = null;
int[][] dims;
List<int[]> overflow = null;
synchronized (dimensionOrder) {
dims = new String[dimensionOrder.size()][];
dims = new int[dimensionOrder.size()][];
for (String dimension : rowDimensions) {
dimension = dimension.toLowerCase();
List<String> dimensionValues = row.getDimension(dimension);
@ -421,9 +424,9 @@ public class IncrementalIndex implements Iterable<Row>, Closeable
if (overflow == null) {
overflow = Lists.newArrayList();
}
overflow.add(getDimVals(dimValues.add(dimension), dimensionValues));
overflow.add(getDimIndexes(dimValues.add(dimension), dimensionValues));
} else {
dims[index] = getDimVals(dimValues.get(dimension), dimensionValues);
dims[index] = getDimIndexes(dimValues.get(dimension), dimensionValues);
}
}
}
@ -431,7 +434,7 @@ public class IncrementalIndex implements Iterable<Row>, Closeable
if (overflow != null) {
// Merge overflow and non-overflow
String[][] newDims = new String[dims.length + overflow.size()][];
int[][] newDims = new int[dims.length + overflow.size()][];
System.arraycopy(dims, 0, newDims, 0, dims.length);
for (int i = 0; i < overflow.size(); ++i) {
newDims[dims.length + i] = overflow.get(i);
@ -492,19 +495,20 @@ public class IncrementalIndex implements Iterable<Row>, Closeable
return facts.lastKey().getTimestamp();
}
private String[] getDimVals(final DimDim dimLookup, final List<String> dimValues)
private int[] getDimIndexes(final DimDim dimLookup, final List<String> dimValues)
{
final String[] retVal = new String[dimValues.size()];
final int[] retVal = new int[dimValues.size()];
int count = 0;
for (String dimValue : dimValues) {
if (!dimLookup.contains(dimValue)) {
dimLookup.add(dimValue);
final String[] vals = dimValues.toArray(new String[0]);
Arrays.sort(vals);
for (String dimValue : vals) {
int id = dimLookup.getId(dimValue);
if (id == -1) {
id = dimLookup.add(dimValue);
}
retVal[count] = dimValue;
retVal[count] = id;
count++;
}
Arrays.sort(retVal);
return retVal;
}
@ -622,11 +626,12 @@ public class IncrementalIndex implements Iterable<Row>, Closeable
final TimeAndDims timeAndDims = input.getKey();
final int rowOffset = input.getValue();
String[][] theDims = timeAndDims.getDims();
int[][] theDims = timeAndDims.getDims();
Map<String, Object> theVals = Maps.newLinkedHashMap();
for (int i = 0; i < theDims.length; ++i) {
String[] dim = theDims[i];
String[] dim = getDimValues(dimValues.get(dimensions.get(i)), theDims[i]);
if (dim != null && dim.length != 0) {
theVals.put(dimensions.get(i), dim.length == 1 ? dim[0] : Arrays.asList(dim));
}
@ -650,6 +655,23 @@ public class IncrementalIndex implements Iterable<Row>, Closeable
};
}
public String[] getDimValues(DimDim dims, int[] dimIndexes)
{
if (dimIndexes == null) {
return null;
}
String[] vals = new String[dimIndexes.length];
for (int i = 0; i < dimIndexes.length; i++) {
vals[i] = dims.getValue(dimIndexes[i]);
}
return vals;
}
public String getDimValue(DimDim dims, int dimIndex)
{
return dims.getValue(dimIndex);
}
@Override
public void close()
{
@ -662,14 +684,14 @@ public class IncrementalIndex implements Iterable<Row>, Closeable
}
}
static class TimeAndDims implements Comparable<TimeAndDims>, Serializable
static class TimeAndDims implements Serializable
{
private final long timestamp;
private final String[][] dims;
private final int[][] dims;
TimeAndDims(
long timestamp,
String[][] dims
int[][] dims
)
{
this.timestamp = timestamp;
@ -681,60 +703,21 @@ public class IncrementalIndex implements Iterable<Row>, Closeable
return timestamp;
}
String[][] getDims()
int[][] getDims()
{
return dims;
}
@Override
public int compareTo(TimeAndDims rhs)
{
int retVal = Longs.compare(timestamp, rhs.timestamp);
if (retVal == 0) {
retVal = Ints.compare(dims.length, rhs.dims.length);
}
int index = 0;
while (retVal == 0 && index < dims.length) {
String[] lhsVals = dims[index];
String[] rhsVals = rhs.dims[index];
if (lhsVals == null) {
if (rhsVals == null) {
++index;
continue;
}
return -1;
}
if (rhsVals == null) {
return 1;
}
retVal = Ints.compare(lhsVals.length, rhsVals.length);
int valsIndex = 0;
while (retVal == 0 && valsIndex < lhsVals.length) {
retVal = lhsVals[valsIndex].compareTo(rhsVals[valsIndex]);
++valsIndex;
}
++index;
}
return retVal;
}
@Override
public String toString()
{
return "TimeAndDims{" +
"timestamp=" + new DateTime(timestamp) +
", dims=" + Lists.transform(
Arrays.asList(dims), new Function<String[], Object>()
Arrays.asList(dims), new Function<int[], Object>()
{
@Override
public Object apply(@Nullable String[] input)
public Object apply(@Nullable int[] input)
{
if (input == null || input.length == 0) {
return Arrays.asList("null");
@ -747,6 +730,67 @@ public class IncrementalIndex implements Iterable<Row>, Closeable
}
}
public static class TimeAndDimsComparator implements Comparator, Serializable
{
// mapdb asserts the comparator to be serializable, ugly workaround to satisfy the assert.
private transient final IncrementalIndex incrementalIndex;
public TimeAndDimsComparator(IncrementalIndex incrementalIndex)
{
this.incrementalIndex = incrementalIndex;
}
@Override
public int compare(Object o1, Object o2)
{
TimeAndDims lhs = (TimeAndDims) o1;
TimeAndDims rhs = (TimeAndDims) o2;
int retVal = Longs.compare(lhs.timestamp, rhs.timestamp);
if (retVal == 0) {
retVal = Ints.compare(lhs.dims.length, rhs.dims.length);
}
int index = 0;
while (retVal == 0 && index < lhs.dims.length) {
int[] lhsIndexes = lhs.dims[index];
int[] rhsIndexes = rhs.dims[index];
if (lhsIndexes == null) {
if (rhsIndexes == null) {
++index;
continue;
}
return -1;
}
if (rhsIndexes == null) {
return 1;
}
retVal = Ints.compare(lhsIndexes.length, rhsIndexes.length);
int valsIndex = 0;
DimDim dimDim = incrementalIndex.getDimension(incrementalIndex.dimensions.get(index));
while (retVal == 0 && valsIndex < lhsIndexes.length) {
retVal = incrementalIndex.getDimValue(dimDim, lhsIndexes[valsIndex]).compareTo(
incrementalIndex.getDimValue(
dimDim,
rhsIndexes[valsIndex]
)
);
++valsIndex;
}
++index;
}
return retVal;
}
}
class DimensionHolder
{
private final Map<String, DimDim> dimensions;
@ -783,19 +827,24 @@ public class IncrementalIndex implements Iterable<Row>, Closeable
{
private final Map<String, Integer> falseIds;
private final Map<Integer, String> falseIdsReverse;
private volatile String[] sortedVals = null;
private final String dimName;
private volatile Map<String, Integer> sortedIds = null;
private volatile Map<Integer, String> sortedIdsReverse = null;
// size on MapDB.HTreeMap is slow so maintain a count here
private volatile int size=0;
private volatile int size = 0;
public DimDim(String dimName)
{
falseIds = db.createHashMap(dimName).make();
this.dimName = dimName;
falseIds = db.createTreeMap(dimName).make();
falseIdsReverse = db.createHashMap(dimName + "_inverse").make();
}
public int getId(String value)
{
return falseIds.get(value);
Integer id = falseIds.get(value);
return id == null ? -1 : id;
}
public String getValue(int id)
@ -813,46 +862,51 @@ public class IncrementalIndex implements Iterable<Row>, Closeable
return size;
}
public Set<String> keySet()
{
return falseIds.keySet();
}
public synchronized void add(String value)
public synchronized int add(String value)
{
assertNotSorted();
final int id = size++;
falseIds.put(value, id);
falseIdsReverse.put(id, value);
return id;
}
public int getSortedId(String value)
{
assertSorted();
return Arrays.binarySearch(sortedVals, value);
return sortedIds.get(value);
}
public String getSortedValue(int index)
{
assertSorted();
return sortedVals[index];
return sortedIdsReverse.get(index);
}
public void sort()
{
if (sortedVals == null) {
sortedVals = new String[size];
int index = 0;
if (sortedIds == null) {
sortedIds = db.createHashMap(dimName + "sorted").make();
sortedIdsReverse = db.createHashMap(dimName + "sortedInverse").make();
int i = 0;
for (String value : falseIds.keySet()) {
sortedVals[index++] = value;
int sortedIndex = i++;
sortedIds.put(value, sortedIndex);
sortedIdsReverse.put(sortedIndex, value);
}
Arrays.sort(sortedVals);
}
}
private void assertSorted()
{
if (sortedVals == null) {
if (sortedIds == null) {
throw new ISE("Call sort() before calling the getSorted* methods.");
}
}
private void assertNotSorted()
{
if (sortedIds != null) {
throw new ISE("Call sort() before calling the getSorted* methods.");
}
}

View File

@ -23,11 +23,9 @@ import com.google.common.base.Function;
import com.google.common.collect.Maps;
import com.metamx.common.guava.FunctionalIterable;
import com.metamx.common.logger.Logger;
import io.druid.data.input.impl.SpatialDimensionSchema;
import io.druid.segment.IndexableAdapter;
import io.druid.segment.Rowboat;
import io.druid.segment.column.ColumnCapabilities;
import io.druid.segment.column.ValueType;
import io.druid.segment.data.EmptyIndexedInts;
import io.druid.segment.data.Indexed;
import io.druid.segment.data.IndexedInts;
@ -65,7 +63,7 @@ public class IncrementalIndexAdapter implements IndexableAdapter
int rowNum = 0;
for (IncrementalIndex.TimeAndDims timeAndDims : index.getFacts().keySet()) {
final String[][] dims = timeAndDims.getDims();
final int[][] dims = timeAndDims.getDims();
for (String dimension : index.getDimensions()) {
int dimIndex = index.getDimensionIndex(dimension);
@ -78,8 +76,8 @@ public class IncrementalIndexAdapter implements IndexableAdapter
if (dimIndex >= dims.length || dims[dimIndex] == null) {
continue;
}
for (String dimValue : dims[dimIndex]) {
final String[] dimValues = index.getDimValues(index.getDimension(dimension), dims[dimIndex]);
for (String dimValue : dimValues) {
ConciseSet conciseSet = conciseSets.get(dimValue);
if (conciseSet == null) {
@ -180,27 +178,27 @@ public class IncrementalIndexAdapter implements IndexableAdapter
)
{
final IncrementalIndex.TimeAndDims timeAndDims = input.getKey();
final String[][] dimValues = timeAndDims.getDims();
final int[][] dimValueIndexes = timeAndDims.getDims();
final int rowOffset = input.getValue();
int[][] dims = new int[dimValues.length][];
int[][] dims = new int[dimValueIndexes.length][];
for (String dimension : index.getDimensions()) {
int dimIndex = index.getDimensionIndex(dimension);
final IncrementalIndex.DimDim dimDim = index.getDimension(dimension);
dimDim.sort();
if (dimIndex >= dimValues.length || dimValues[dimIndex] == null) {
if (dimIndex >= dimValueIndexes.length || dimValueIndexes[dimIndex] == null) {
continue;
}
dims[dimIndex] = new int[dimValues[dimIndex].length];
dims[dimIndex] = new int[dimValueIndexes[dimIndex].length];
if (dimIndex >= dims.length || dims[dimIndex] == null) {
continue;
}
for (int i = 0; i < dimValues[dimIndex].length; ++i) {
dims[dimIndex][i] = dimDim.getSortedId(dimValues[dimIndex][i]);
String[] dimValues = index.getDimValues(dimDim, dimValueIndexes[dimIndex]);
for (int i = 0; i < dimValues.length; ++i) {
dims[dimIndex][i] = dimDim.getSortedId(dimValues[i]);
}
}

View File

@ -29,8 +29,8 @@ import com.metamx.collections.spatial.search.Bound;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
import io.druid.granularity.QueryGranularity;
import io.druid.query.aggregation.BufferAggregator;
import io.druid.query.QueryInterruptedException;
import io.druid.query.aggregation.BufferAggregator;
import io.druid.query.filter.Filter;
import io.druid.query.filter.ValueMatcher;
import io.druid.query.filter.ValueMatcherFactory;
@ -174,10 +174,10 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter
{
cursorMap = index.getSubMap(
new IncrementalIndex.TimeAndDims(
timeStart, new String[][]{}
timeStart, new int[][]{}
),
new IncrementalIndex.TimeAndDims(
Math.min(actualInterval.getEndMillis(), gran.next(input)), new String[][]{}
Math.min(actualInterval.getEndMillis(), gran.next(input)), new int[][]{}
)
);
time = gran.toDateTime(input);
@ -293,12 +293,11 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter
{
final ArrayList<Integer> vals = Lists.newArrayList();
if (dimIndex < currEntry.getKey().getDims().length) {
final String[] dimVals = currEntry.getKey().getDims()[dimIndex];
final int[] dimVals = currEntry.getKey().getDims()[dimIndex];
if (dimVals != null) {
for (String dimVal : dimVals) {
int id = dimValLookup.getId(dimVal);
if (id < maxId) {
vals.add(id);
for (int dimVal : dimVals) {
if (dimVal < maxId) {
vals.add(dimVal);
}
}
}
@ -409,8 +408,8 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter
}
final Integer dimensionIndexInt = index.getDimensionIndex(columnName);
if (dimensionIndexInt != null) {
final IncrementalIndex.DimDim dimDim = index.getDimension(columnName);
final int dimensionIndex = dimensionIndexInt;
return new ObjectColumnSelector<Object>()
{
@ -423,7 +422,8 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter
@Override
public Object get()
{
final String[] dimVals = currEntry.getKey().getDims()[dimensionIndex];
final String[] dimVals = index.getDimValues(dimDim, currEntry.getKey().getDims()[dimensionIndex]);
if (dimVals.length == 1) {
return dimVals[0];
} else if (dimVals.length == 0) {
@ -488,15 +488,16 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter
}
@Override
public ValueMatcher makeValueMatcher(final String dimension,final String value)
public ValueMatcher makeValueMatcher(final String dimension,String valueParam)
{
final String value = valueParam == null ? "" : valueParam;
Integer dimIndexObject = index.getDimensionIndex(dimension.toLowerCase());
if (dimIndexObject == null) {
return new BooleanValueMatcher(false);
}
if (!index.getDimension(dimension.toLowerCase()).contains(value)) {
if (value == null || "".equals(value)) {
if ("".equals(value)) {
final int dimIndex = dimIndexObject;
return new ValueMatcher()
@ -504,7 +505,7 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter
@Override
public boolean matches()
{
String[][] dims = holder.getKey().getDims();
int[][] dims = holder.getKey().getDims();
if (dimIndex >= dims.length || dims[dimIndex] == null) {
return true;
}
@ -516,18 +517,19 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter
}
final int dimIndex = dimIndexObject;
final IncrementalIndex.DimDim dimDim = index.getDimension(dimension.toLowerCase());
return new ValueMatcher()
{
@Override
public boolean matches()
{
String[][] dims = holder.getKey().getDims();
int[][] dims = holder.getKey().getDims();
if (dimIndex >= dims.length || dims[dimIndex] == null) {
return false;
}
for (String dimVal : dims[dimIndex]) {
for (String dimVal : index.getDimValues(dimDim, dims[dimIndex])) {
if (value.equals(dimVal)) {
return true;
}
@ -545,18 +547,18 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter
return new BooleanValueMatcher(false);
}
final int dimIndex = dimIndexObject;
final IncrementalIndex.DimDim dimDim = index.getDimension(dimension.toLowerCase());
return new ValueMatcher()
{
@Override
public boolean matches()
{
String[][] dims = holder.getKey().getDims();
int[][] dims = holder.getKey().getDims();
if (dimIndex >= dims.length || dims[dimIndex] == null) {
return false;
}
for (String dimVal : dims[dimIndex]) {
for (String dimVal : index.getDimValues(dimDim, dims[dimIndex])) {
if (predicate.apply(dimVal)) {
return true;
}
@ -580,12 +582,13 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter
@Override
public boolean matches()
{
String[][] dims = holder.getKey().getDims();
int[][] dims = holder.getKey().getDims();
if (dimIndex >= dims.length || dims[dimIndex] == null) {
return false;
}
final IncrementalIndex.DimDim dimDim = index.getDimension(dimension.toLowerCase());
for (String dimVal : dims[dimIndex]) {
for (String dimVal : index.getDimValues(dimDim, dims[dimIndex])) {
List<String> stringCoords = Lists.newArrayList(SPLITTER.split(dimVal));
float[] coords = new float[stringCoords.size()];
for (int j = 0; j < coords.length; j++) {