diff --git a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java index 0e94e8b22e3..def34d3a092 100644 --- a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java +++ b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java @@ -64,11 +64,11 @@ import java.io.Serializable; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; +import java.util.Comparator; import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentNavigableMap; import java.util.concurrent.CopyOnWriteArrayList; @@ -314,8 +314,11 @@ public class IncrementalIndex implements Iterable, Closeable } this.bufferHolder = bufferPool.take(); this.dimValues = new DimensionHolder(); - db = DBMaker.newMemoryDirectDB().transactionDisable().cacheWeakRefEnable().make(); - this.facts = db.createTreeMap("__facts" + UUID.randomUUID()).make(); + db = DBMaker.newMemoryDirectDB().transactionDisable().asyncWriteEnable().cacheSoftRefEnable().make(); + this.facts = db.createTreeMap("__facts" + UUID.randomUUID()).comparator( + new TimeAndDimsComparator(this) + ).make(); + } public IncrementalIndex( @@ -394,10 +397,10 @@ public class IncrementalIndex implements Iterable, Closeable final List rowDimensions = row.getDimensions(); - String[][] dims; - List overflow = null; + int[][] dims; + List overflow = null; synchronized (dimensionOrder) { - dims = new String[dimensionOrder.size()][]; + dims = new int[dimensionOrder.size()][]; for (String dimension : rowDimensions) { dimension = dimension.toLowerCase(); List dimensionValues = row.getDimension(dimension); @@ -421,9 +424,9 @@ public class IncrementalIndex implements Iterable, Closeable if (overflow == null) { overflow = Lists.newArrayList(); } - overflow.add(getDimVals(dimValues.add(dimension), dimensionValues)); + overflow.add(getDimIndexes(dimValues.add(dimension), dimensionValues)); } else { - dims[index] = getDimVals(dimValues.get(dimension), dimensionValues); + dims[index] = getDimIndexes(dimValues.get(dimension), dimensionValues); } } } @@ -431,7 +434,7 @@ public class IncrementalIndex implements Iterable, Closeable if (overflow != null) { // Merge overflow and non-overflow - String[][] newDims = new String[dims.length + overflow.size()][]; + int[][] newDims = new int[dims.length + overflow.size()][]; System.arraycopy(dims, 0, newDims, 0, dims.length); for (int i = 0; i < overflow.size(); ++i) { newDims[dims.length + i] = overflow.get(i); @@ -492,19 +495,20 @@ public class IncrementalIndex implements Iterable, Closeable return facts.lastKey().getTimestamp(); } - private String[] getDimVals(final DimDim dimLookup, final List dimValues) + private int[] getDimIndexes(final DimDim dimLookup, final List dimValues) { - final String[] retVal = new String[dimValues.size()]; - + final int[] retVal = new int[dimValues.size()]; int count = 0; - for (String dimValue : dimValues) { - if (!dimLookup.contains(dimValue)) { - dimLookup.add(dimValue); + final String[] vals = dimValues.toArray(new String[0]); + Arrays.sort(vals); + for (String dimValue : vals) { + int id = dimLookup.getId(dimValue); + if (id == -1) { + id = dimLookup.add(dimValue); } - retVal[count] = dimValue; + retVal[count] = id; count++; } - Arrays.sort(retVal); return retVal; } @@ -622,11 +626,12 @@ public class IncrementalIndex implements Iterable, Closeable final TimeAndDims timeAndDims = input.getKey(); final int rowOffset = input.getValue(); - String[][] theDims = timeAndDims.getDims(); + int[][] theDims = timeAndDims.getDims(); Map theVals = Maps.newLinkedHashMap(); for (int i = 0; i < theDims.length; ++i) { - String[] dim = theDims[i]; + String[] dim = getDimValues(dimValues.get(dimensions.get(i)), theDims[i]); + if (dim != null && dim.length != 0) { theVals.put(dimensions.get(i), dim.length == 1 ? dim[0] : Arrays.asList(dim)); } @@ -650,6 +655,23 @@ public class IncrementalIndex implements Iterable, Closeable }; } + public String[] getDimValues(DimDim dims, int[] dimIndexes) + { + if (dimIndexes == null) { + return null; + } + String[] vals = new String[dimIndexes.length]; + for (int i = 0; i < dimIndexes.length; i++) { + vals[i] = dims.getValue(dimIndexes[i]); + } + return vals; + } + + public String getDimValue(DimDim dims, int dimIndex) + { + return dims.getValue(dimIndex); + } + @Override public void close() { @@ -662,14 +684,14 @@ public class IncrementalIndex implements Iterable, Closeable } } - static class TimeAndDims implements Comparable, Serializable + static class TimeAndDims implements Serializable { private final long timestamp; - private final String[][] dims; + private final int[][] dims; TimeAndDims( long timestamp, - String[][] dims + int[][] dims ) { this.timestamp = timestamp; @@ -681,60 +703,21 @@ public class IncrementalIndex implements Iterable, Closeable return timestamp; } - String[][] getDims() + int[][] getDims() { return dims; } - @Override - public int compareTo(TimeAndDims rhs) - { - int retVal = Longs.compare(timestamp, rhs.timestamp); - - if (retVal == 0) { - retVal = Ints.compare(dims.length, rhs.dims.length); - } - - int index = 0; - while (retVal == 0 && index < dims.length) { - String[] lhsVals = dims[index]; - String[] rhsVals = rhs.dims[index]; - - if (lhsVals == null) { - if (rhsVals == null) { - ++index; - continue; - } - return -1; - } - - if (rhsVals == null) { - return 1; - } - - retVal = Ints.compare(lhsVals.length, rhsVals.length); - - int valsIndex = 0; - while (retVal == 0 && valsIndex < lhsVals.length) { - retVal = lhsVals[valsIndex].compareTo(rhsVals[valsIndex]); - ++valsIndex; - } - ++index; - } - - return retVal; - } - @Override public String toString() { return "TimeAndDims{" + "timestamp=" + new DateTime(timestamp) + ", dims=" + Lists.transform( - Arrays.asList(dims), new Function() + Arrays.asList(dims), new Function() { @Override - public Object apply(@Nullable String[] input) + public Object apply(@Nullable int[] input) { if (input == null || input.length == 0) { return Arrays.asList("null"); @@ -747,6 +730,67 @@ public class IncrementalIndex implements Iterable, Closeable } } + public static class TimeAndDimsComparator implements Comparator, Serializable + { + // mapdb asserts the comparator to be serializable, ugly workaround to satisfy the assert. + private transient final IncrementalIndex incrementalIndex; + + public TimeAndDimsComparator(IncrementalIndex incrementalIndex) + { + this.incrementalIndex = incrementalIndex; + } + + @Override + public int compare(Object o1, Object o2) + { + TimeAndDims lhs = (TimeAndDims) o1; + TimeAndDims rhs = (TimeAndDims) o2; + + int retVal = Longs.compare(lhs.timestamp, rhs.timestamp); + + if (retVal == 0) { + retVal = Ints.compare(lhs.dims.length, rhs.dims.length); + } + + int index = 0; + while (retVal == 0 && index < lhs.dims.length) { + int[] lhsIndexes = lhs.dims[index]; + int[] rhsIndexes = rhs.dims[index]; + + if (lhsIndexes == null) { + if (rhsIndexes == null) { + ++index; + continue; + } + return -1; + } + + if (rhsIndexes == null) { + return 1; + } + + retVal = Ints.compare(lhsIndexes.length, rhsIndexes.length); + + int valsIndex = 0; + + DimDim dimDim = incrementalIndex.getDimension(incrementalIndex.dimensions.get(index)); + + while (retVal == 0 && valsIndex < lhsIndexes.length) { + retVal = incrementalIndex.getDimValue(dimDim, lhsIndexes[valsIndex]).compareTo( + incrementalIndex.getDimValue( + dimDim, + rhsIndexes[valsIndex] + ) + ); + ++valsIndex; + } + ++index; + } + + return retVal; + } + } + class DimensionHolder { private final Map dimensions; @@ -783,19 +827,24 @@ public class IncrementalIndex implements Iterable, Closeable { private final Map falseIds; private final Map falseIdsReverse; - private volatile String[] sortedVals = null; + private final String dimName; + private volatile Map sortedIds = null; + private volatile Map sortedIdsReverse = null; // size on MapDB.HTreeMap is slow so maintain a count here - private volatile int size=0; + private volatile int size = 0; public DimDim(String dimName) { - falseIds = db.createHashMap(dimName).make(); + this.dimName = dimName; + + falseIds = db.createTreeMap(dimName).make(); falseIdsReverse = db.createHashMap(dimName + "_inverse").make(); } public int getId(String value) { - return falseIds.get(value); + Integer id = falseIds.get(value); + return id == null ? -1 : id; } public String getValue(int id) @@ -813,46 +862,51 @@ public class IncrementalIndex implements Iterable, Closeable return size; } - public Set keySet() - { - return falseIds.keySet(); - } - - public synchronized void add(String value) + public synchronized int add(String value) { + assertNotSorted(); final int id = size++; falseIds.put(value, id); falseIdsReverse.put(id, value); + return id; } public int getSortedId(String value) { assertSorted(); - return Arrays.binarySearch(sortedVals, value); + return sortedIds.get(value); } public String getSortedValue(int index) { assertSorted(); - return sortedVals[index]; + return sortedIdsReverse.get(index); } public void sort() { - if (sortedVals == null) { - sortedVals = new String[size]; - - int index = 0; + if (sortedIds == null) { + sortedIds = db.createHashMap(dimName + "sorted").make(); + sortedIdsReverse = db.createHashMap(dimName + "sortedInverse").make(); + int i = 0; for (String value : falseIds.keySet()) { - sortedVals[index++] = value; + int sortedIndex = i++; + sortedIds.put(value, sortedIndex); + sortedIdsReverse.put(sortedIndex, value); } - Arrays.sort(sortedVals); } } private void assertSorted() { - if (sortedVals == null) { + if (sortedIds == null) { + throw new ISE("Call sort() before calling the getSorted* methods."); + } + } + + private void assertNotSorted() + { + if (sortedIds != null) { throw new ISE("Call sort() before calling the getSorted* methods."); } } diff --git a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexAdapter.java b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexAdapter.java index 0d7d10d2212..34e71d3c02b 100644 --- a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexAdapter.java +++ b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexAdapter.java @@ -23,11 +23,9 @@ import com.google.common.base.Function; import com.google.common.collect.Maps; import com.metamx.common.guava.FunctionalIterable; import com.metamx.common.logger.Logger; -import io.druid.data.input.impl.SpatialDimensionSchema; import io.druid.segment.IndexableAdapter; import io.druid.segment.Rowboat; import io.druid.segment.column.ColumnCapabilities; -import io.druid.segment.column.ValueType; import io.druid.segment.data.EmptyIndexedInts; import io.druid.segment.data.Indexed; import io.druid.segment.data.IndexedInts; @@ -65,7 +63,7 @@ public class IncrementalIndexAdapter implements IndexableAdapter int rowNum = 0; for (IncrementalIndex.TimeAndDims timeAndDims : index.getFacts().keySet()) { - final String[][] dims = timeAndDims.getDims(); + final int[][] dims = timeAndDims.getDims(); for (String dimension : index.getDimensions()) { int dimIndex = index.getDimensionIndex(dimension); @@ -78,8 +76,8 @@ public class IncrementalIndexAdapter implements IndexableAdapter if (dimIndex >= dims.length || dims[dimIndex] == null) { continue; } - - for (String dimValue : dims[dimIndex]) { + final String[] dimValues = index.getDimValues(index.getDimension(dimension), dims[dimIndex]); + for (String dimValue : dimValues) { ConciseSet conciseSet = conciseSets.get(dimValue); if (conciseSet == null) { @@ -180,27 +178,27 @@ public class IncrementalIndexAdapter implements IndexableAdapter ) { final IncrementalIndex.TimeAndDims timeAndDims = input.getKey(); - final String[][] dimValues = timeAndDims.getDims(); + final int[][] dimValueIndexes = timeAndDims.getDims(); final int rowOffset = input.getValue(); - int[][] dims = new int[dimValues.length][]; + int[][] dims = new int[dimValueIndexes.length][]; for (String dimension : index.getDimensions()) { int dimIndex = index.getDimensionIndex(dimension); final IncrementalIndex.DimDim dimDim = index.getDimension(dimension); dimDim.sort(); - if (dimIndex >= dimValues.length || dimValues[dimIndex] == null) { + if (dimIndex >= dimValueIndexes.length || dimValueIndexes[dimIndex] == null) { continue; } - dims[dimIndex] = new int[dimValues[dimIndex].length]; + dims[dimIndex] = new int[dimValueIndexes[dimIndex].length]; if (dimIndex >= dims.length || dims[dimIndex] == null) { continue; } - - for (int i = 0; i < dimValues[dimIndex].length; ++i) { - dims[dimIndex][i] = dimDim.getSortedId(dimValues[dimIndex][i]); + String[] dimValues = index.getDimValues(dimDim, dimValueIndexes[dimIndex]); + for (int i = 0; i < dimValues.length; ++i) { + dims[dimIndex][i] = dimDim.getSortedId(dimValues[i]); } } diff --git a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexStorageAdapter.java b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexStorageAdapter.java index 214bba923f3..507881d2a99 100644 --- a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexStorageAdapter.java +++ b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexStorageAdapter.java @@ -29,8 +29,8 @@ import com.metamx.collections.spatial.search.Bound; import com.metamx.common.guava.Sequence; import com.metamx.common.guava.Sequences; import io.druid.granularity.QueryGranularity; -import io.druid.query.aggregation.BufferAggregator; import io.druid.query.QueryInterruptedException; +import io.druid.query.aggregation.BufferAggregator; import io.druid.query.filter.Filter; import io.druid.query.filter.ValueMatcher; import io.druid.query.filter.ValueMatcherFactory; @@ -174,10 +174,10 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter { cursorMap = index.getSubMap( new IncrementalIndex.TimeAndDims( - timeStart, new String[][]{} + timeStart, new int[][]{} ), new IncrementalIndex.TimeAndDims( - Math.min(actualInterval.getEndMillis(), gran.next(input)), new String[][]{} + Math.min(actualInterval.getEndMillis(), gran.next(input)), new int[][]{} ) ); time = gran.toDateTime(input); @@ -293,12 +293,11 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter { final ArrayList vals = Lists.newArrayList(); if (dimIndex < currEntry.getKey().getDims().length) { - final String[] dimVals = currEntry.getKey().getDims()[dimIndex]; + final int[] dimVals = currEntry.getKey().getDims()[dimIndex]; if (dimVals != null) { - for (String dimVal : dimVals) { - int id = dimValLookup.getId(dimVal); - if (id < maxId) { - vals.add(id); + for (int dimVal : dimVals) { + if (dimVal < maxId) { + vals.add(dimVal); } } } @@ -409,8 +408,8 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter } final Integer dimensionIndexInt = index.getDimensionIndex(columnName); - if (dimensionIndexInt != null) { + final IncrementalIndex.DimDim dimDim = index.getDimension(columnName); final int dimensionIndex = dimensionIndexInt; return new ObjectColumnSelector() { @@ -423,7 +422,8 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter @Override public Object get() { - final String[] dimVals = currEntry.getKey().getDims()[dimensionIndex]; + final String[] dimVals = index.getDimValues(dimDim, currEntry.getKey().getDims()[dimensionIndex]); + if (dimVals.length == 1) { return dimVals[0]; } else if (dimVals.length == 0) { @@ -488,15 +488,16 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter } @Override - public ValueMatcher makeValueMatcher(final String dimension,final String value) + public ValueMatcher makeValueMatcher(final String dimension,String valueParam) { + final String value = valueParam == null ? "" : valueParam; Integer dimIndexObject = index.getDimensionIndex(dimension.toLowerCase()); if (dimIndexObject == null) { return new BooleanValueMatcher(false); } if (!index.getDimension(dimension.toLowerCase()).contains(value)) { - if (value == null || "".equals(value)) { + if ("".equals(value)) { final int dimIndex = dimIndexObject; return new ValueMatcher() @@ -504,7 +505,7 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter @Override public boolean matches() { - String[][] dims = holder.getKey().getDims(); + int[][] dims = holder.getKey().getDims(); if (dimIndex >= dims.length || dims[dimIndex] == null) { return true; } @@ -516,18 +517,19 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter } final int dimIndex = dimIndexObject; + final IncrementalIndex.DimDim dimDim = index.getDimension(dimension.toLowerCase()); return new ValueMatcher() { @Override public boolean matches() { - String[][] dims = holder.getKey().getDims(); + int[][] dims = holder.getKey().getDims(); if (dimIndex >= dims.length || dims[dimIndex] == null) { return false; } - for (String dimVal : dims[dimIndex]) { + for (String dimVal : index.getDimValues(dimDim, dims[dimIndex])) { if (value.equals(dimVal)) { return true; } @@ -545,18 +547,18 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter return new BooleanValueMatcher(false); } final int dimIndex = dimIndexObject; - + final IncrementalIndex.DimDim dimDim = index.getDimension(dimension.toLowerCase()); return new ValueMatcher() { @Override public boolean matches() { - String[][] dims = holder.getKey().getDims(); + int[][] dims = holder.getKey().getDims(); if (dimIndex >= dims.length || dims[dimIndex] == null) { return false; } - for (String dimVal : dims[dimIndex]) { + for (String dimVal : index.getDimValues(dimDim, dims[dimIndex])) { if (predicate.apply(dimVal)) { return true; } @@ -580,12 +582,13 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter @Override public boolean matches() { - String[][] dims = holder.getKey().getDims(); + int[][] dims = holder.getKey().getDims(); if (dimIndex >= dims.length || dims[dimIndex] == null) { return false; } + final IncrementalIndex.DimDim dimDim = index.getDimension(dimension.toLowerCase()); - for (String dimVal : dims[dimIndex]) { + for (String dimVal : index.getDimValues(dimDim, dims[dimIndex])) { List stringCoords = Lists.newArrayList(SPLITTER.split(dimVal)); float[] coords = new float[stringCoords.size()]; for (int j = 0; j < coords.length; j++) {