From 178f002f05d7a52a5a583b336a8a498dea8924a8 Mon Sep 17 00:00:00 2001 From: nishantmonu51 Date: Tue, 9 Sep 2014 15:59:33 +0530 Subject: [PATCH 01/12] indexing working with mapdb --- pom.xml | 5 + processing/pom.xml | 4 + .../segment/incremental/IncrementalIndex.java | 117 +++++++++--------- .../IncrementalIndexStorageAdapter.java | 9 +- 4 files changed, 71 insertions(+), 64 deletions(-) diff --git a/pom.xml b/pom.xml index ce074209e92..c097a937158 100644 --- a/pom.xml +++ b/pom.xml @@ -429,6 +429,11 @@ 2.3.0 provided + + org.mapdb + mapdb + 1.0.6 + diff --git a/processing/pom.xml b/processing/pom.xml index 8b04ea52763..7ccbed8b4da 100644 --- a/processing/pom.xml +++ b/processing/pom.xml @@ -86,6 +86,10 @@ net.jpountz.lz4 lz4 + + org.mapdb + mapdb + diff --git a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java index c1ab7d49327..4b22b71ee36 100644 --- a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java +++ b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java @@ -21,8 +21,6 @@ package io.druid.segment.incremental; import com.google.common.base.Function; import com.google.common.base.Throwables; -import com.google.common.collect.BiMap; -import com.google.common.collect.HashBiMap; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterators; @@ -56,10 +54,13 @@ import io.druid.segment.serde.ComplexMetricSerde; import io.druid.segment.serde.ComplexMetrics; import org.joda.time.DateTime; import org.joda.time.Interval; +import org.mapdb.DB; +import org.mapdb.DBMaker; import javax.annotation.Nullable; import java.io.Closeable; import java.io.IOException; +import java.io.Serializable; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; @@ -69,7 +70,6 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentNavigableMap; -import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicInteger; @@ -91,8 +91,9 @@ public class IncrementalIndex implements Iterable, Closeable private final CopyOnWriteArrayList dimensions; private final DimensionHolder dimValues; private final Map columnCapabilities; - private final ConcurrentSkipListMap facts; + private final ConcurrentNavigableMap facts; private final ResourceHolder bufferHolder; + private final DB db; private volatile AtomicInteger numEntries = new AtomicInteger(); // This is modified on add() in a critical section. private ThreadLocal in = new ThreadLocal<>(); @@ -312,7 +313,8 @@ public class IncrementalIndex implements Iterable, Closeable } this.bufferHolder = bufferPool.take(); this.dimValues = new DimensionHolder(); - this.facts = new ConcurrentSkipListMap<>(); + db = DBMaker.newMemoryDirectDB().transactionDisable().cacheWeakRefEnable().make(); + this.facts = db.createTreeMap("facts").make(); } public IncrementalIndex( @@ -495,13 +497,10 @@ public class IncrementalIndex implements Iterable, Closeable int count = 0; for (String dimValue : dimValues) { - String canonicalDimValue = dimLookup.get(dimValue); - if (canonicalDimValue == null) { - canonicalDimValue = dimValue; + if (!dimLookup.contains(dimValue)) { dimLookup.add(dimValue); } - - retVal[count] = canonicalDimValue; + retVal[count] = dimValue; count++; } Arrays.sort(retVal); @@ -589,7 +588,7 @@ public class IncrementalIndex implements Iterable, Closeable return columnCapabilities.get(column); } - ConcurrentSkipListMap getFacts() + ConcurrentNavigableMap getFacts() { return facts; } @@ -655,45 +654,14 @@ public class IncrementalIndex implements Iterable, Closeable { try { bufferHolder.close(); + db.close(); } catch (IOException e) { throw Throwables.propagate(e); } } - static class DimensionHolder - { - private final Map dimensions; - - DimensionHolder() - { - dimensions = Maps.newConcurrentMap(); - } - - void reset() - { - dimensions.clear(); - } - - DimDim add(String dimension) - { - DimDim holder = dimensions.get(dimension); - if (holder == null) { - holder = new DimDim(); - dimensions.put(dimension, holder); - } else { - throw new ISE("dimension[%s] already existed even though add() was called!?", dimension); - } - return holder; - } - - DimDim get(String dimension) - { - return dimensions.get(dimension); - } - } - - static class TimeAndDims implements Comparable + static class TimeAndDims implements Comparable, Serializable { private final long timestamp; private final String[][] dims; @@ -778,23 +746,48 @@ public class IncrementalIndex implements Iterable, Closeable } } - static class DimDim + class DimensionHolder + { + private final Map dimensions; + + DimensionHolder() + { + dimensions = Maps.newConcurrentMap(); + } + + void reset() + { + dimensions.clear(); + } + + DimDim add(String dimension) + { + DimDim holder = dimensions.get(dimension); + if (holder == null) { + holder = new DimDim(dimension); + dimensions.put(dimension, holder); + } else { + throw new ISE("dimension[%s] already existed even though add() was called!?", dimension); + } + return holder; + } + + DimDim get(String dimension) + { + return dimensions.get(dimension); + } + } + + class DimDim { - private final Map poorMansInterning = Maps.newConcurrentMap(); private final Map falseIds; private final Map falseIdsReverse; private volatile String[] sortedVals = null; - public DimDim() + public DimDim(String dimName) { - BiMap biMap = Maps.synchronizedBiMap(HashBiMap.create()); - falseIds = biMap; - falseIdsReverse = biMap.inverse(); - } - - public String get(String value) - { - return value == null ? null : poorMansInterning.get(value); + falseIds = db.createHashMap(dimName).make(); + falseIdsReverse = db.createHashMap(dimName + "_inverse").make(); } public int getId(String value) @@ -807,20 +800,26 @@ public class IncrementalIndex implements Iterable, Closeable return falseIdsReverse.get(id); } + public boolean contains(String value) + { + return falseIds.containsKey(value); + } + public int size() { - return poorMansInterning.size(); + return falseIds.size(); } public Set keySet() { - return poorMansInterning.keySet(); + return falseIds.keySet(); } public synchronized void add(String value) { - poorMansInterning.put(value, value); - falseIds.put(value, falseIds.size()); + final int id = falseIds.size(); + falseIds.put(value, id); + falseIdsReverse.put(id, value); } public int getSortedId(String value) diff --git a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexStorageAdapter.java b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexStorageAdapter.java index d02a8c361b4..214bba923f3 100644 --- a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexStorageAdapter.java +++ b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexStorageAdapter.java @@ -488,14 +488,14 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter } @Override - public ValueMatcher makeValueMatcher(String dimension, String value) + public ValueMatcher makeValueMatcher(final String dimension,final String value) { Integer dimIndexObject = index.getDimensionIndex(dimension.toLowerCase()); if (dimIndexObject == null) { return new BooleanValueMatcher(false); } - String idObject = index.getDimension(dimension.toLowerCase()).get(value); - if (idObject == null) { + + if (!index.getDimension(dimension.toLowerCase()).contains(value)) { if (value == null || "".equals(value)) { final int dimIndex = dimIndexObject; @@ -516,7 +516,6 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter } final int dimIndex = dimIndexObject; - final String id = idObject; return new ValueMatcher() { @@ -529,7 +528,7 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter } for (String dimVal : dims[dimIndex]) { - if (id == dimVal) { + if (value.equals(dimVal)) { return true; } } From e7a5a01f7bb005278a20d4564f4b7727ea74c361 Mon Sep 17 00:00:00 2001 From: nishantmonu51 Date: Tue, 9 Sep 2014 16:02:50 +0530 Subject: [PATCH 02/12] cache size locally --- .../io/druid/segment/incremental/IncrementalIndex.java | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java index 4b22b71ee36..c34c521c731 100644 --- a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java +++ b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java @@ -783,6 +783,8 @@ public class IncrementalIndex implements Iterable, Closeable private final Map falseIds; private final Map falseIdsReverse; private volatile String[] sortedVals = null; + // size on MapDB.HTreeMap is slow so maintain a count here + private volatile int size=0; public DimDim(String dimName) { @@ -807,7 +809,7 @@ public class IncrementalIndex implements Iterable, Closeable public int size() { - return falseIds.size(); + return size; } public Set keySet() @@ -817,7 +819,7 @@ public class IncrementalIndex implements Iterable, Closeable public synchronized void add(String value) { - final int id = falseIds.size(); + final int id = size++; falseIds.put(value, id); falseIdsReverse.put(id, value); } @@ -837,7 +839,7 @@ public class IncrementalIndex implements Iterable, Closeable public void sort() { if (sortedVals == null) { - sortedVals = new String[falseIds.size()]; + sortedVals = new String[size]; int index = 0; for (String value : falseIds.keySet()) { From 2b3ad4d742b1001fbfb469747c27e538a1a78f72 Mon Sep 17 00:00:00 2001 From: nishantmonu51 Date: Tue, 9 Sep 2014 23:20:19 +0530 Subject: [PATCH 03/12] append a random UUID to map name avoid collision with dimension name --- .../java/io/druid/segment/incremental/IncrementalIndex.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java index c34c521c731..0e94e8b22e3 100644 --- a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java +++ b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java @@ -69,6 +69,7 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.UUID; import java.util.concurrent.ConcurrentNavigableMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicInteger; @@ -314,7 +315,7 @@ public class IncrementalIndex implements Iterable, Closeable this.bufferHolder = bufferPool.take(); this.dimValues = new DimensionHolder(); db = DBMaker.newMemoryDirectDB().transactionDisable().cacheWeakRefEnable().make(); - this.facts = db.createTreeMap("facts").make(); + this.facts = db.createTreeMap("__facts" + UUID.randomUUID()).make(); } public IncrementalIndex( From c39eaf870b569ee3c0ae2b2dc1a5f78e49c537e0 Mon Sep 17 00:00:00 2001 From: nishantmonu51 Date: Thu, 11 Sep 2014 16:13:37 +0530 Subject: [PATCH 04/12] TimeAndDims optimise to store indexes --- .../segment/incremental/IncrementalIndex.java | 220 +++++++++++------- .../incremental/IncrementalIndexAdapter.java | 22 +- .../IncrementalIndexStorageAdapter.java | 43 ++-- 3 files changed, 170 insertions(+), 115 deletions(-) diff --git a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java index 0e94e8b22e3..def34d3a092 100644 --- a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java +++ b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java @@ -64,11 +64,11 @@ import java.io.Serializable; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; +import java.util.Comparator; import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentNavigableMap; import java.util.concurrent.CopyOnWriteArrayList; @@ -314,8 +314,11 @@ public class IncrementalIndex implements Iterable, Closeable } this.bufferHolder = bufferPool.take(); this.dimValues = new DimensionHolder(); - db = DBMaker.newMemoryDirectDB().transactionDisable().cacheWeakRefEnable().make(); - this.facts = db.createTreeMap("__facts" + UUID.randomUUID()).make(); + db = DBMaker.newMemoryDirectDB().transactionDisable().asyncWriteEnable().cacheSoftRefEnable().make(); + this.facts = db.createTreeMap("__facts" + UUID.randomUUID()).comparator( + new TimeAndDimsComparator(this) + ).make(); + } public IncrementalIndex( @@ -394,10 +397,10 @@ public class IncrementalIndex implements Iterable, Closeable final List rowDimensions = row.getDimensions(); - String[][] dims; - List overflow = null; + int[][] dims; + List overflow = null; synchronized (dimensionOrder) { - dims = new String[dimensionOrder.size()][]; + dims = new int[dimensionOrder.size()][]; for (String dimension : rowDimensions) { dimension = dimension.toLowerCase(); List dimensionValues = row.getDimension(dimension); @@ -421,9 +424,9 @@ public class IncrementalIndex implements Iterable, Closeable if (overflow == null) { overflow = Lists.newArrayList(); } - overflow.add(getDimVals(dimValues.add(dimension), dimensionValues)); + overflow.add(getDimIndexes(dimValues.add(dimension), dimensionValues)); } else { - dims[index] = getDimVals(dimValues.get(dimension), dimensionValues); + dims[index] = getDimIndexes(dimValues.get(dimension), dimensionValues); } } } @@ -431,7 +434,7 @@ public class IncrementalIndex implements Iterable, Closeable if (overflow != null) { // Merge overflow and non-overflow - String[][] newDims = new String[dims.length + overflow.size()][]; + int[][] newDims = new int[dims.length + overflow.size()][]; System.arraycopy(dims, 0, newDims, 0, dims.length); for (int i = 0; i < overflow.size(); ++i) { newDims[dims.length + i] = overflow.get(i); @@ -492,19 +495,20 @@ public class IncrementalIndex implements Iterable, Closeable return facts.lastKey().getTimestamp(); } - private String[] getDimVals(final DimDim dimLookup, final List dimValues) + private int[] getDimIndexes(final DimDim dimLookup, final List dimValues) { - final String[] retVal = new String[dimValues.size()]; - + final int[] retVal = new int[dimValues.size()]; int count = 0; - for (String dimValue : dimValues) { - if (!dimLookup.contains(dimValue)) { - dimLookup.add(dimValue); + final String[] vals = dimValues.toArray(new String[0]); + Arrays.sort(vals); + for (String dimValue : vals) { + int id = dimLookup.getId(dimValue); + if (id == -1) { + id = dimLookup.add(dimValue); } - retVal[count] = dimValue; + retVal[count] = id; count++; } - Arrays.sort(retVal); return retVal; } @@ -622,11 +626,12 @@ public class IncrementalIndex implements Iterable, Closeable final TimeAndDims timeAndDims = input.getKey(); final int rowOffset = input.getValue(); - String[][] theDims = timeAndDims.getDims(); + int[][] theDims = timeAndDims.getDims(); Map theVals = Maps.newLinkedHashMap(); for (int i = 0; i < theDims.length; ++i) { - String[] dim = theDims[i]; + String[] dim = getDimValues(dimValues.get(dimensions.get(i)), theDims[i]); + if (dim != null && dim.length != 0) { theVals.put(dimensions.get(i), dim.length == 1 ? dim[0] : Arrays.asList(dim)); } @@ -650,6 +655,23 @@ public class IncrementalIndex implements Iterable, Closeable }; } + public String[] getDimValues(DimDim dims, int[] dimIndexes) + { + if (dimIndexes == null) { + return null; + } + String[] vals = new String[dimIndexes.length]; + for (int i = 0; i < dimIndexes.length; i++) { + vals[i] = dims.getValue(dimIndexes[i]); + } + return vals; + } + + public String getDimValue(DimDim dims, int dimIndex) + { + return dims.getValue(dimIndex); + } + @Override public void close() { @@ -662,14 +684,14 @@ public class IncrementalIndex implements Iterable, Closeable } } - static class TimeAndDims implements Comparable, Serializable + static class TimeAndDims implements Serializable { private final long timestamp; - private final String[][] dims; + private final int[][] dims; TimeAndDims( long timestamp, - String[][] dims + int[][] dims ) { this.timestamp = timestamp; @@ -681,60 +703,21 @@ public class IncrementalIndex implements Iterable, Closeable return timestamp; } - String[][] getDims() + int[][] getDims() { return dims; } - @Override - public int compareTo(TimeAndDims rhs) - { - int retVal = Longs.compare(timestamp, rhs.timestamp); - - if (retVal == 0) { - retVal = Ints.compare(dims.length, rhs.dims.length); - } - - int index = 0; - while (retVal == 0 && index < dims.length) { - String[] lhsVals = dims[index]; - String[] rhsVals = rhs.dims[index]; - - if (lhsVals == null) { - if (rhsVals == null) { - ++index; - continue; - } - return -1; - } - - if (rhsVals == null) { - return 1; - } - - retVal = Ints.compare(lhsVals.length, rhsVals.length); - - int valsIndex = 0; - while (retVal == 0 && valsIndex < lhsVals.length) { - retVal = lhsVals[valsIndex].compareTo(rhsVals[valsIndex]); - ++valsIndex; - } - ++index; - } - - return retVal; - } - @Override public String toString() { return "TimeAndDims{" + "timestamp=" + new DateTime(timestamp) + ", dims=" + Lists.transform( - Arrays.asList(dims), new Function() + Arrays.asList(dims), new Function() { @Override - public Object apply(@Nullable String[] input) + public Object apply(@Nullable int[] input) { if (input == null || input.length == 0) { return Arrays.asList("null"); @@ -747,6 +730,67 @@ public class IncrementalIndex implements Iterable, Closeable } } + public static class TimeAndDimsComparator implements Comparator, Serializable + { + // mapdb asserts the comparator to be serializable, ugly workaround to satisfy the assert. + private transient final IncrementalIndex incrementalIndex; + + public TimeAndDimsComparator(IncrementalIndex incrementalIndex) + { + this.incrementalIndex = incrementalIndex; + } + + @Override + public int compare(Object o1, Object o2) + { + TimeAndDims lhs = (TimeAndDims) o1; + TimeAndDims rhs = (TimeAndDims) o2; + + int retVal = Longs.compare(lhs.timestamp, rhs.timestamp); + + if (retVal == 0) { + retVal = Ints.compare(lhs.dims.length, rhs.dims.length); + } + + int index = 0; + while (retVal == 0 && index < lhs.dims.length) { + int[] lhsIndexes = lhs.dims[index]; + int[] rhsIndexes = rhs.dims[index]; + + if (lhsIndexes == null) { + if (rhsIndexes == null) { + ++index; + continue; + } + return -1; + } + + if (rhsIndexes == null) { + return 1; + } + + retVal = Ints.compare(lhsIndexes.length, rhsIndexes.length); + + int valsIndex = 0; + + DimDim dimDim = incrementalIndex.getDimension(incrementalIndex.dimensions.get(index)); + + while (retVal == 0 && valsIndex < lhsIndexes.length) { + retVal = incrementalIndex.getDimValue(dimDim, lhsIndexes[valsIndex]).compareTo( + incrementalIndex.getDimValue( + dimDim, + rhsIndexes[valsIndex] + ) + ); + ++valsIndex; + } + ++index; + } + + return retVal; + } + } + class DimensionHolder { private final Map dimensions; @@ -783,19 +827,24 @@ public class IncrementalIndex implements Iterable, Closeable { private final Map falseIds; private final Map falseIdsReverse; - private volatile String[] sortedVals = null; + private final String dimName; + private volatile Map sortedIds = null; + private volatile Map sortedIdsReverse = null; // size on MapDB.HTreeMap is slow so maintain a count here - private volatile int size=0; + private volatile int size = 0; public DimDim(String dimName) { - falseIds = db.createHashMap(dimName).make(); + this.dimName = dimName; + + falseIds = db.createTreeMap(dimName).make(); falseIdsReverse = db.createHashMap(dimName + "_inverse").make(); } public int getId(String value) { - return falseIds.get(value); + Integer id = falseIds.get(value); + return id == null ? -1 : id; } public String getValue(int id) @@ -813,46 +862,51 @@ public class IncrementalIndex implements Iterable, Closeable return size; } - public Set keySet() - { - return falseIds.keySet(); - } - - public synchronized void add(String value) + public synchronized int add(String value) { + assertNotSorted(); final int id = size++; falseIds.put(value, id); falseIdsReverse.put(id, value); + return id; } public int getSortedId(String value) { assertSorted(); - return Arrays.binarySearch(sortedVals, value); + return sortedIds.get(value); } public String getSortedValue(int index) { assertSorted(); - return sortedVals[index]; + return sortedIdsReverse.get(index); } public void sort() { - if (sortedVals == null) { - sortedVals = new String[size]; - - int index = 0; + if (sortedIds == null) { + sortedIds = db.createHashMap(dimName + "sorted").make(); + sortedIdsReverse = db.createHashMap(dimName + "sortedInverse").make(); + int i = 0; for (String value : falseIds.keySet()) { - sortedVals[index++] = value; + int sortedIndex = i++; + sortedIds.put(value, sortedIndex); + sortedIdsReverse.put(sortedIndex, value); } - Arrays.sort(sortedVals); } } private void assertSorted() { - if (sortedVals == null) { + if (sortedIds == null) { + throw new ISE("Call sort() before calling the getSorted* methods."); + } + } + + private void assertNotSorted() + { + if (sortedIds != null) { throw new ISE("Call sort() before calling the getSorted* methods."); } } diff --git a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexAdapter.java b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexAdapter.java index 0d7d10d2212..34e71d3c02b 100644 --- a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexAdapter.java +++ b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexAdapter.java @@ -23,11 +23,9 @@ import com.google.common.base.Function; import com.google.common.collect.Maps; import com.metamx.common.guava.FunctionalIterable; import com.metamx.common.logger.Logger; -import io.druid.data.input.impl.SpatialDimensionSchema; import io.druid.segment.IndexableAdapter; import io.druid.segment.Rowboat; import io.druid.segment.column.ColumnCapabilities; -import io.druid.segment.column.ValueType; import io.druid.segment.data.EmptyIndexedInts; import io.druid.segment.data.Indexed; import io.druid.segment.data.IndexedInts; @@ -65,7 +63,7 @@ public class IncrementalIndexAdapter implements IndexableAdapter int rowNum = 0; for (IncrementalIndex.TimeAndDims timeAndDims : index.getFacts().keySet()) { - final String[][] dims = timeAndDims.getDims(); + final int[][] dims = timeAndDims.getDims(); for (String dimension : index.getDimensions()) { int dimIndex = index.getDimensionIndex(dimension); @@ -78,8 +76,8 @@ public class IncrementalIndexAdapter implements IndexableAdapter if (dimIndex >= dims.length || dims[dimIndex] == null) { continue; } - - for (String dimValue : dims[dimIndex]) { + final String[] dimValues = index.getDimValues(index.getDimension(dimension), dims[dimIndex]); + for (String dimValue : dimValues) { ConciseSet conciseSet = conciseSets.get(dimValue); if (conciseSet == null) { @@ -180,27 +178,27 @@ public class IncrementalIndexAdapter implements IndexableAdapter ) { final IncrementalIndex.TimeAndDims timeAndDims = input.getKey(); - final String[][] dimValues = timeAndDims.getDims(); + final int[][] dimValueIndexes = timeAndDims.getDims(); final int rowOffset = input.getValue(); - int[][] dims = new int[dimValues.length][]; + int[][] dims = new int[dimValueIndexes.length][]; for (String dimension : index.getDimensions()) { int dimIndex = index.getDimensionIndex(dimension); final IncrementalIndex.DimDim dimDim = index.getDimension(dimension); dimDim.sort(); - if (dimIndex >= dimValues.length || dimValues[dimIndex] == null) { + if (dimIndex >= dimValueIndexes.length || dimValueIndexes[dimIndex] == null) { continue; } - dims[dimIndex] = new int[dimValues[dimIndex].length]; + dims[dimIndex] = new int[dimValueIndexes[dimIndex].length]; if (dimIndex >= dims.length || dims[dimIndex] == null) { continue; } - - for (int i = 0; i < dimValues[dimIndex].length; ++i) { - dims[dimIndex][i] = dimDim.getSortedId(dimValues[dimIndex][i]); + String[] dimValues = index.getDimValues(dimDim, dimValueIndexes[dimIndex]); + for (int i = 0; i < dimValues.length; ++i) { + dims[dimIndex][i] = dimDim.getSortedId(dimValues[i]); } } diff --git a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexStorageAdapter.java b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexStorageAdapter.java index 214bba923f3..507881d2a99 100644 --- a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexStorageAdapter.java +++ b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexStorageAdapter.java @@ -29,8 +29,8 @@ import com.metamx.collections.spatial.search.Bound; import com.metamx.common.guava.Sequence; import com.metamx.common.guava.Sequences; import io.druid.granularity.QueryGranularity; -import io.druid.query.aggregation.BufferAggregator; import io.druid.query.QueryInterruptedException; +import io.druid.query.aggregation.BufferAggregator; import io.druid.query.filter.Filter; import io.druid.query.filter.ValueMatcher; import io.druid.query.filter.ValueMatcherFactory; @@ -174,10 +174,10 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter { cursorMap = index.getSubMap( new IncrementalIndex.TimeAndDims( - timeStart, new String[][]{} + timeStart, new int[][]{} ), new IncrementalIndex.TimeAndDims( - Math.min(actualInterval.getEndMillis(), gran.next(input)), new String[][]{} + Math.min(actualInterval.getEndMillis(), gran.next(input)), new int[][]{} ) ); time = gran.toDateTime(input); @@ -293,12 +293,11 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter { final ArrayList vals = Lists.newArrayList(); if (dimIndex < currEntry.getKey().getDims().length) { - final String[] dimVals = currEntry.getKey().getDims()[dimIndex]; + final int[] dimVals = currEntry.getKey().getDims()[dimIndex]; if (dimVals != null) { - for (String dimVal : dimVals) { - int id = dimValLookup.getId(dimVal); - if (id < maxId) { - vals.add(id); + for (int dimVal : dimVals) { + if (dimVal < maxId) { + vals.add(dimVal); } } } @@ -409,8 +408,8 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter } final Integer dimensionIndexInt = index.getDimensionIndex(columnName); - if (dimensionIndexInt != null) { + final IncrementalIndex.DimDim dimDim = index.getDimension(columnName); final int dimensionIndex = dimensionIndexInt; return new ObjectColumnSelector() { @@ -423,7 +422,8 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter @Override public Object get() { - final String[] dimVals = currEntry.getKey().getDims()[dimensionIndex]; + final String[] dimVals = index.getDimValues(dimDim, currEntry.getKey().getDims()[dimensionIndex]); + if (dimVals.length == 1) { return dimVals[0]; } else if (dimVals.length == 0) { @@ -488,15 +488,16 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter } @Override - public ValueMatcher makeValueMatcher(final String dimension,final String value) + public ValueMatcher makeValueMatcher(final String dimension,String valueParam) { + final String value = valueParam == null ? "" : valueParam; Integer dimIndexObject = index.getDimensionIndex(dimension.toLowerCase()); if (dimIndexObject == null) { return new BooleanValueMatcher(false); } if (!index.getDimension(dimension.toLowerCase()).contains(value)) { - if (value == null || "".equals(value)) { + if ("".equals(value)) { final int dimIndex = dimIndexObject; return new ValueMatcher() @@ -504,7 +505,7 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter @Override public boolean matches() { - String[][] dims = holder.getKey().getDims(); + int[][] dims = holder.getKey().getDims(); if (dimIndex >= dims.length || dims[dimIndex] == null) { return true; } @@ -516,18 +517,19 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter } final int dimIndex = dimIndexObject; + final IncrementalIndex.DimDim dimDim = index.getDimension(dimension.toLowerCase()); return new ValueMatcher() { @Override public boolean matches() { - String[][] dims = holder.getKey().getDims(); + int[][] dims = holder.getKey().getDims(); if (dimIndex >= dims.length || dims[dimIndex] == null) { return false; } - for (String dimVal : dims[dimIndex]) { + for (String dimVal : index.getDimValues(dimDim, dims[dimIndex])) { if (value.equals(dimVal)) { return true; } @@ -545,18 +547,18 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter return new BooleanValueMatcher(false); } final int dimIndex = dimIndexObject; - + final IncrementalIndex.DimDim dimDim = index.getDimension(dimension.toLowerCase()); return new ValueMatcher() { @Override public boolean matches() { - String[][] dims = holder.getKey().getDims(); + int[][] dims = holder.getKey().getDims(); if (dimIndex >= dims.length || dims[dimIndex] == null) { return false; } - for (String dimVal : dims[dimIndex]) { + for (String dimVal : index.getDimValues(dimDim, dims[dimIndex])) { if (predicate.apply(dimVal)) { return true; } @@ -580,12 +582,13 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter @Override public boolean matches() { - String[][] dims = holder.getKey().getDims(); + int[][] dims = holder.getKey().getDims(); if (dimIndex >= dims.length || dims[dimIndex] == null) { return false; } + final IncrementalIndex.DimDim dimDim = index.getDimension(dimension.toLowerCase()); - for (String dimVal : dims[dimIndex]) { + for (String dimVal : index.getDimValues(dimDim, dims[dimIndex])) { List stringCoords = Lists.newArrayList(SPLITTER.split(dimVal)); float[] coords = new float[stringCoords.size()]; for (int j = 0; j < coords.length; j++) { From b94c5f0bd6b23050ae5b751599ddab835f1ec6d6 Mon Sep 17 00:00:00 2001 From: nishantmonu51 Date: Fri, 12 Sep 2014 22:32:40 +0530 Subject: [PATCH 05/12] use putIfAbsent instead of get and put use putIfAbsent instead of get and put. reduce no. of calls to mapdb decreases latency. --- .../segment/incremental/IncrementalIndex.java | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java index def34d3a092..fe6d5fbf7fd 100644 --- a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java +++ b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java @@ -70,6 +70,7 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.UUID; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentNavigableMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicInteger; @@ -431,7 +432,6 @@ public class IncrementalIndex implements Iterable, Closeable } } - if (overflow != null) { // Merge overflow and non-overflow int[][] newDims = new int[dims.length + overflow.size()][]; @@ -443,21 +443,24 @@ public class IncrementalIndex implements Iterable, Closeable } final TimeAndDims key = new TimeAndDims(Math.max(gran.truncate(row.getTimestampFromEpoch()), minTimestamp), dims); - + Integer rowOffset; synchronized (this) { - if (!facts.containsKey(key)) { - int rowOffset = totalAggSize * numEntries.getAndIncrement(); + rowOffset = totalAggSize * numEntries.get(); + final Integer prev = facts.putIfAbsent(key, rowOffset); + if (prev != null) { + rowOffset = prev; + } else { if (rowOffset + totalAggSize > bufferHolder.get().limit()) { + facts.remove(key); throw new ISE("Buffer full, cannot add more rows! Current rowSize[%,d].", numEntries.get()); } + numEntries.incrementAndGet(); for (int i = 0; i < aggs.length; i++) { aggs[i].init(bufferHolder.get(), getMetricPosition(rowOffset, i)); } - facts.put(key, rowOffset); } } in.set(row); - int rowOffset = facts.get(key); for (int i = 0; i < aggs.length; i++) { synchronized (aggs[i]) { aggs[i].aggregate(bufferHolder.get(), getMetricPosition(rowOffset, i)); From 19bc77134af89a863433d4cac3816ba3f741ecbe Mon Sep 17 00:00:00 2001 From: nishantmonu51 Date: Fri, 12 Sep 2014 23:39:02 +0530 Subject: [PATCH 06/12] use hash map and in memory sort during index generation use hash map and in memory sort during index generation --- .../segment/incremental/IncrementalIndex.java | 37 ++++++------------- 1 file changed, 12 insertions(+), 25 deletions(-) diff --git a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java index fe6d5fbf7fd..7c745ba79bb 100644 --- a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java +++ b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java @@ -830,17 +830,13 @@ public class IncrementalIndex implements Iterable, Closeable { private final Map falseIds; private final Map falseIdsReverse; - private final String dimName; - private volatile Map sortedIds = null; - private volatile Map sortedIdsReverse = null; - // size on MapDB.HTreeMap is slow so maintain a count here + private volatile String[] sortedVals = null; + // size on MapDB is slow so maintain a count here private volatile int size = 0; public DimDim(String dimName) { - this.dimName = dimName; - - falseIds = db.createTreeMap(dimName).make(); + falseIds = db.createHashMap(dimName).make(); falseIdsReverse = db.createHashMap(dimName + "_inverse").make(); } @@ -867,7 +863,6 @@ public class IncrementalIndex implements Iterable, Closeable public synchronized int add(String value) { - assertNotSorted(); final int id = size++; falseIds.put(value, id); falseIdsReverse.put(id, value); @@ -877,39 +872,31 @@ public class IncrementalIndex implements Iterable, Closeable public int getSortedId(String value) { assertSorted(); - return sortedIds.get(value); + return Arrays.binarySearch(sortedVals, value); } public String getSortedValue(int index) { assertSorted(); - return sortedIdsReverse.get(index); + return sortedVals[index]; } public void sort() { - if (sortedIds == null) { - sortedIds = db.createHashMap(dimName + "sorted").make(); - sortedIdsReverse = db.createHashMap(dimName + "sortedInverse").make(); - int i = 0; + if (sortedVals == null) { + sortedVals = new String[falseIds.size()]; + + int index = 0; for (String value : falseIds.keySet()) { - int sortedIndex = i++; - sortedIds.put(value, sortedIndex); - sortedIdsReverse.put(sortedIndex, value); + sortedVals[index++] = value; } + Arrays.sort(sortedVals); } } private void assertSorted() { - if (sortedIds == null) { - throw new ISE("Call sort() before calling the getSorted* methods."); - } - } - - private void assertNotSorted() - { - if (sortedIds != null) { + if (sortedVals == null) { throw new ISE("Call sort() before calling the getSorted* methods."); } } From 8c16377e9ee230093c8b501c5aa5405ea25b6533 Mon Sep 17 00:00:00 2001 From: nishantmonu51 Date: Fri, 19 Sep 2014 23:09:31 +0530 Subject: [PATCH 07/12] add custom serializer add custom serializers --- .../segment/incremental/IncrementalIndex.java | 287 +++++++++++------- .../incremental/IncrementalIndexAdapter.java | 20 +- .../IncrementalIndexStorageAdapter.java | 41 ++- .../druid/segment/incremental/InternUtil.java | 45 +++ 4 files changed, 247 insertions(+), 146 deletions(-) create mode 100644 processing/src/main/java/io/druid/segment/incremental/InternUtil.java diff --git a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java index 7c745ba79bb..a64e5af9ef9 100644 --- a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java +++ b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java @@ -54,11 +54,16 @@ import io.druid.segment.serde.ComplexMetricSerde; import io.druid.segment.serde.ComplexMetrics; import org.joda.time.DateTime; import org.joda.time.Interval; +import org.mapdb.BTreeKeySerializer; +import org.mapdb.CC; import org.mapdb.DB; import org.mapdb.DBMaker; +import org.mapdb.Serializer; import javax.annotation.Nullable; import java.io.Closeable; +import java.io.DataInput; +import java.io.DataOutput; import java.io.IOException; import java.io.Serializable; import java.nio.ByteBuffer; @@ -315,11 +320,18 @@ public class IncrementalIndex implements Iterable, Closeable } this.bufferHolder = bufferPool.take(); this.dimValues = new DimensionHolder(); - db = DBMaker.newMemoryDirectDB().transactionDisable().asyncWriteEnable().cacheSoftRefEnable().make(); - this.facts = db.createTreeMap("__facts" + UUID.randomUUID()).comparator( - new TimeAndDimsComparator(this) + db = DBMaker.newMemoryDirectDB().transactionDisable().asyncWriteEnable().cacheLRUEnable().cacheSize( + Integer.getInteger( + "cacheSize", + CC.DEFAULT_CACHE_SIZE + ) ).make(); - + final TimeAndDimsSerializer timeAndDimsSerializer = new TimeAndDimsSerializer(this); + this.facts = db.createTreeMap("__facts" + UUID.randomUUID()) + .keySerializer(timeAndDimsSerializer) + .comparator(timeAndDimsSerializer.getComparator()) + .valueSerializer(Serializer.INTEGER) + .make(); } public IncrementalIndex( @@ -398,10 +410,10 @@ public class IncrementalIndex implements Iterable, Closeable final List rowDimensions = row.getDimensions(); - int[][] dims; - List overflow = null; + String[][] dims; + List overflow = null; synchronized (dimensionOrder) { - dims = new int[dimensionOrder.size()][]; + dims = new String[dimensionOrder.size()][]; for (String dimension : rowDimensions) { dimension = dimension.toLowerCase(); List dimensionValues = row.getDimension(dimension); @@ -425,16 +437,17 @@ public class IncrementalIndex implements Iterable, Closeable if (overflow == null) { overflow = Lists.newArrayList(); } - overflow.add(getDimIndexes(dimValues.add(dimension), dimensionValues)); + overflow.add(getDimVals(dimValues.add(dimension), dimensionValues)); } else { - dims[index] = getDimIndexes(dimValues.get(dimension), dimensionValues); + dims[index] = getDimVals(dimValues.get(dimension), dimensionValues); } } } + if (overflow != null) { // Merge overflow and non-overflow - int[][] newDims = new int[dims.length + overflow.size()][]; + String[][] newDims = new String[dims.length + overflow.size()][]; System.arraycopy(dims, 0, newDims, 0, dims.length); for (int i = 0; i < overflow.size(); ++i) { newDims[dims.length + i] = overflow.get(i); @@ -498,20 +511,18 @@ public class IncrementalIndex implements Iterable, Closeable return facts.lastKey().getTimestamp(); } - private int[] getDimIndexes(final DimDim dimLookup, final List dimValues) + private String[] getDimVals(final DimDim dimLookup, final List dimValues) { - final int[] retVal = new int[dimValues.size()]; + final String[] retVal = new String[dimValues.size()]; + int count = 0; - final String[] vals = dimValues.toArray(new String[0]); - Arrays.sort(vals); - for (String dimValue : vals) { - int id = dimLookup.getId(dimValue); - if (id == -1) { - id = dimLookup.add(dimValue); - } - retVal[count] = id; + for (String dimValue : dimValues) { + String canonicalValue = InternUtil.intern(dimValue); + dimLookup.addIfAbsent(canonicalValue); + retVal[count] = canonicalValue; count++; } + Arrays.sort(retVal); return retVal; } @@ -629,12 +640,11 @@ public class IncrementalIndex implements Iterable, Closeable final TimeAndDims timeAndDims = input.getKey(); final int rowOffset = input.getValue(); - int[][] theDims = timeAndDims.getDims(); + String[][] theDims = timeAndDims.getDims(); Map theVals = Maps.newLinkedHashMap(); for (int i = 0; i < theDims.length; ++i) { - String[] dim = getDimValues(dimValues.get(dimensions.get(i)), theDims[i]); - + String[] dim = theDims[i]; if (dim != null && dim.length != 0) { theVals.put(dimensions.get(i), dim.length == 1 ? dim[0] : Arrays.asList(dim)); } @@ -658,23 +668,6 @@ public class IncrementalIndex implements Iterable, Closeable }; } - public String[] getDimValues(DimDim dims, int[] dimIndexes) - { - if (dimIndexes == null) { - return null; - } - String[] vals = new String[dimIndexes.length]; - for (int i = 0; i < dimIndexes.length; i++) { - vals[i] = dims.getValue(dimIndexes[i]); - } - return vals; - } - - public String getDimValue(DimDim dims, int dimIndex) - { - return dims.getValue(dimIndex); - } - @Override public void close() { @@ -687,52 +680,6 @@ public class IncrementalIndex implements Iterable, Closeable } } - static class TimeAndDims implements Serializable - { - private final long timestamp; - private final int[][] dims; - - TimeAndDims( - long timestamp, - int[][] dims - ) - { - this.timestamp = timestamp; - this.dims = dims; - } - - long getTimestamp() - { - return timestamp; - } - - int[][] getDims() - { - return dims; - } - - @Override - public String toString() - { - return "TimeAndDims{" + - "timestamp=" + new DateTime(timestamp) + - ", dims=" + Lists.transform( - Arrays.asList(dims), new Function() - { - @Override - public Object apply(@Nullable int[] input) - { - if (input == null || input.length == 0) { - return Arrays.asList("null"); - } - return Arrays.asList(input); - } - } - ) + - '}'; - } - } - public static class TimeAndDimsComparator implements Comparator, Serializable { // mapdb asserts the comparator to be serializable, ugly workaround to satisfy the assert. @@ -746,45 +693,65 @@ public class IncrementalIndex implements Iterable, Closeable @Override public int compare(Object o1, Object o2) { - TimeAndDims lhs = (TimeAndDims) o1; - TimeAndDims rhs = (TimeAndDims) o2; + return ((TimeAndDims) o1).compareTo((io.druid.segment.incremental.IncrementalIndex.TimeAndDims) o2); + } + } - int retVal = Longs.compare(lhs.timestamp, rhs.timestamp); + static class TimeAndDims implements Comparable, Serializable + { + private final long timestamp; + private final String[][] dims; + + TimeAndDims( + long timestamp, + String[][] dims + ) + { + this.timestamp = timestamp; + this.dims = dims; + } + + long getTimestamp() + { + return timestamp; + } + + String[][] getDims() + { + return dims; + } + + @Override + public int compareTo(TimeAndDims rhs) + { + int retVal = Longs.compare(timestamp, rhs.timestamp); if (retVal == 0) { - retVal = Ints.compare(lhs.dims.length, rhs.dims.length); + retVal = Ints.compare(dims.length, rhs.dims.length); } int index = 0; - while (retVal == 0 && index < lhs.dims.length) { - int[] lhsIndexes = lhs.dims[index]; - int[] rhsIndexes = rhs.dims[index]; + while (retVal == 0 && index < dims.length) { + String[] lhsVals = dims[index]; + String[] rhsVals = rhs.dims[index]; - if (lhsIndexes == null) { - if (rhsIndexes == null) { + if (lhsVals == null) { + if (rhsVals == null) { ++index; continue; } return -1; } - if (rhsIndexes == null) { + if (rhsVals == null) { return 1; } - retVal = Ints.compare(lhsIndexes.length, rhsIndexes.length); + retVal = Ints.compare(lhsVals.length, rhsVals.length); int valsIndex = 0; - - DimDim dimDim = incrementalIndex.getDimension(incrementalIndex.dimensions.get(index)); - - while (retVal == 0 && valsIndex < lhsIndexes.length) { - retVal = incrementalIndex.getDimValue(dimDim, lhsIndexes[valsIndex]).compareTo( - incrementalIndex.getDimValue( - dimDim, - rhsIndexes[valsIndex] - ) - ); + while (retVal == 0 && valsIndex < lhsVals.length) { + retVal = lhsVals[valsIndex].compareTo(rhsVals[valsIndex]); ++valsIndex; } ++index; @@ -792,6 +759,92 @@ public class IncrementalIndex implements Iterable, Closeable return retVal; } + + @Override + public String toString() + { + return "TimeAndDims{" + + "timestamp=" + new DateTime(timestamp) + + ", dims=" + Lists.transform( + Arrays.asList(dims), new Function() + { + @Override + public Object apply(@Nullable String[] input) + { + if (input == null || input.length == 0) { + return Arrays.asList("null"); + } + return Arrays.asList(input); + } + } + ) + + '}'; + } + + } + + public static class TimeAndDimsSerializer extends BTreeKeySerializer implements Serializable + { + private final TimeAndDimsComparator comparator; + private final transient IncrementalIndex incrementalIndex; + + TimeAndDimsSerializer(IncrementalIndex incrementalIndex) + { + this.comparator = new TimeAndDimsComparator(incrementalIndex); + this.incrementalIndex = incrementalIndex; + } + + @Override + public void serialize(DataOutput out, int start, int end, Object[] keys) throws IOException + { + for (int i = start; i < end; i++) { + TimeAndDims timeAndDim = (TimeAndDims) keys[i]; + out.writeLong(timeAndDim.timestamp); + out.writeInt(timeAndDim.dims.length); + int index = 0; + for (String[] dims : timeAndDim.dims) { + if (dims == null) { + out.write(-1); + } else { + DimDim dimDim = incrementalIndex.getDimension(incrementalIndex.dimensions.get(index)); + out.writeInt(dims.length); + for (String value : dims) { + out.writeInt(dimDim.getId(value)); + } + } + index++; + } + } + } + + @Override + public Object[] deserialize(DataInput in, int start, int end, int size) throws IOException + { + Object[] ret = new Object[size]; + for (int i = start; i < end; i++) { + final long timeStamp = in.readLong(); + final String[][] dims = new String[in.readInt()][]; + for (int k = 0; k < dims.length; k++) { + int len = in.readInt(); + if (len != -1) { + DimDim dimDim = incrementalIndex.getDimension(incrementalIndex.dimensions.get(k)); + String[] col = new String[len]; + for (int l = 0; l < col.length; l++) { + col[l] = InternUtil.intern(dimDim.getValue(in.readInt())); + } + dims[k] = col; + } + } + ret[i] = new TimeAndDims(timeStamp, dims); + } + return ret; + } + + @Override + public Comparator getComparator() + { + return comparator; + } } class DimensionHolder @@ -828,7 +881,7 @@ public class IncrementalIndex implements Iterable, Closeable class DimDim { - private final Map falseIds; + private final ConcurrentMap falseIds; private final Map falseIdsReverse; private volatile String[] sortedVals = null; // size on MapDB is slow so maintain a count here @@ -836,8 +889,11 @@ public class IncrementalIndex implements Iterable, Closeable public DimDim(String dimName) { - falseIds = db.createHashMap(dimName).make(); - falseIdsReverse = db.createHashMap(dimName + "_inverse").make(); + falseIds = db.createHashMap(dimName).keySerializer(Serializer.STRING).valueSerializer(Serializer.INTEGER).make(); + falseIdsReverse = db.createHashMap(dimName + "_inverse") + .keySerializer(Serializer.INTEGER) + .valueSerializer(Serializer.STRING) + .make(); } public int getId(String value) @@ -861,11 +917,14 @@ public class IncrementalIndex implements Iterable, Closeable return size; } - public synchronized int add(String value) + public synchronized int addIfAbsent(String value) { - final int id = size++; - falseIds.put(value, id); - falseIdsReverse.put(id, value); + Integer id = falseIds.putIfAbsent(value, size); + if (id == null) { + falseIdsReverse.put(size, value); + id = size; + size++; + } return id; } diff --git a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexAdapter.java b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexAdapter.java index 34e71d3c02b..1addc0551e1 100644 --- a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexAdapter.java +++ b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexAdapter.java @@ -63,7 +63,7 @@ public class IncrementalIndexAdapter implements IndexableAdapter int rowNum = 0; for (IncrementalIndex.TimeAndDims timeAndDims : index.getFacts().keySet()) { - final int[][] dims = timeAndDims.getDims(); + final String[][] dims = timeAndDims.getDims(); for (String dimension : index.getDimensions()) { int dimIndex = index.getDimensionIndex(dimension); @@ -76,8 +76,8 @@ public class IncrementalIndexAdapter implements IndexableAdapter if (dimIndex >= dims.length || dims[dimIndex] == null) { continue; } - final String[] dimValues = index.getDimValues(index.getDimension(dimension), dims[dimIndex]); - for (String dimValue : dimValues) { + + for (String dimValue : dims[dimIndex]) { ConciseSet conciseSet = conciseSets.get(dimValue); if (conciseSet == null) { @@ -178,27 +178,27 @@ public class IncrementalIndexAdapter implements IndexableAdapter ) { final IncrementalIndex.TimeAndDims timeAndDims = input.getKey(); - final int[][] dimValueIndexes = timeAndDims.getDims(); + final String[][] dimValues = timeAndDims.getDims(); final int rowOffset = input.getValue(); - int[][] dims = new int[dimValueIndexes.length][]; + int[][] dims = new int[dimValues.length][]; for (String dimension : index.getDimensions()) { int dimIndex = index.getDimensionIndex(dimension); final IncrementalIndex.DimDim dimDim = index.getDimension(dimension); dimDim.sort(); - if (dimIndex >= dimValueIndexes.length || dimValueIndexes[dimIndex] == null) { + if (dimIndex >= dimValues.length || dimValues[dimIndex] == null) { continue; } - dims[dimIndex] = new int[dimValueIndexes[dimIndex].length]; + dims[dimIndex] = new int[dimValues[dimIndex].length]; if (dimIndex >= dims.length || dims[dimIndex] == null) { continue; } - String[] dimValues = index.getDimValues(dimDim, dimValueIndexes[dimIndex]); - for (int i = 0; i < dimValues.length; ++i) { - dims[dimIndex][i] = dimDim.getSortedId(dimValues[i]); + + for (int i = 0; i < dimValues[dimIndex].length; ++i) { + dims[dimIndex][i] = dimDim.getSortedId(dimValues[dimIndex][i]); } } diff --git a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexStorageAdapter.java b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexStorageAdapter.java index 507881d2a99..5210c04e1d5 100644 --- a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexStorageAdapter.java +++ b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexStorageAdapter.java @@ -174,10 +174,10 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter { cursorMap = index.getSubMap( new IncrementalIndex.TimeAndDims( - timeStart, new int[][]{} + timeStart, new String[][]{} ), new IncrementalIndex.TimeAndDims( - Math.min(actualInterval.getEndMillis(), gran.next(input)), new int[][]{} + Math.min(actualInterval.getEndMillis(), gran.next(input)), new String[][]{} ) ); time = gran.toDateTime(input); @@ -293,11 +293,12 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter { final ArrayList vals = Lists.newArrayList(); if (dimIndex < currEntry.getKey().getDims().length) { - final int[] dimVals = currEntry.getKey().getDims()[dimIndex]; + final String[] dimVals = currEntry.getKey().getDims()[dimIndex]; if (dimVals != null) { - for (int dimVal : dimVals) { - if (dimVal < maxId) { - vals.add(dimVal); + for (String dimVal : dimVals) { + int id = dimValLookup.getId(dimVal); + if (id < maxId) { + vals.add(id); } } } @@ -408,8 +409,8 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter } final Integer dimensionIndexInt = index.getDimensionIndex(columnName); + if (dimensionIndexInt != null) { - final IncrementalIndex.DimDim dimDim = index.getDimension(columnName); final int dimensionIndex = dimensionIndexInt; return new ObjectColumnSelector() { @@ -422,8 +423,7 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter @Override public Object get() { - final String[] dimVals = index.getDimValues(dimDim, currEntry.getKey().getDims()[dimensionIndex]); - + final String[] dimVals = currEntry.getKey().getDims()[dimensionIndex]; if (dimVals.length == 1) { return dimVals[0]; } else if (dimVals.length == 0) { @@ -488,16 +488,15 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter } @Override - public ValueMatcher makeValueMatcher(final String dimension,String valueParam) + public ValueMatcher makeValueMatcher(final String dimension, final String value) { - final String value = valueParam == null ? "" : valueParam; Integer dimIndexObject = index.getDimensionIndex(dimension.toLowerCase()); if (dimIndexObject == null) { return new BooleanValueMatcher(false); } if (!index.getDimension(dimension.toLowerCase()).contains(value)) { - if ("".equals(value)) { + if (value == null || "".equals(value)) { final int dimIndex = dimIndexObject; return new ValueMatcher() @@ -505,7 +504,7 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter @Override public boolean matches() { - int[][] dims = holder.getKey().getDims(); + String[][] dims = holder.getKey().getDims(); if (dimIndex >= dims.length || dims[dimIndex] == null) { return true; } @@ -517,19 +516,18 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter } final int dimIndex = dimIndexObject; - final IncrementalIndex.DimDim dimDim = index.getDimension(dimension.toLowerCase()); return new ValueMatcher() { @Override public boolean matches() { - int[][] dims = holder.getKey().getDims(); + String[][] dims = holder.getKey().getDims(); if (dimIndex >= dims.length || dims[dimIndex] == null) { return false; } - for (String dimVal : index.getDimValues(dimDim, dims[dimIndex])) { + for (String dimVal : dims[dimIndex]) { if (value.equals(dimVal)) { return true; } @@ -547,18 +545,18 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter return new BooleanValueMatcher(false); } final int dimIndex = dimIndexObject; - final IncrementalIndex.DimDim dimDim = index.getDimension(dimension.toLowerCase()); + return new ValueMatcher() { @Override public boolean matches() { - int[][] dims = holder.getKey().getDims(); + String[][] dims = holder.getKey().getDims(); if (dimIndex >= dims.length || dims[dimIndex] == null) { return false; } - for (String dimVal : index.getDimValues(dimDim, dims[dimIndex])) { + for (String dimVal : dims[dimIndex]) { if (predicate.apply(dimVal)) { return true; } @@ -582,13 +580,12 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter @Override public boolean matches() { - int[][] dims = holder.getKey().getDims(); + String[][] dims = holder.getKey().getDims(); if (dimIndex >= dims.length || dims[dimIndex] == null) { return false; } - final IncrementalIndex.DimDim dimDim = index.getDimension(dimension.toLowerCase()); - for (String dimVal : index.getDimValues(dimDim, dims[dimIndex])) { + for (String dimVal : dims[dimIndex]) { List stringCoords = Lists.newArrayList(SPLITTER.split(dimVal)); float[] coords = new float[stringCoords.size()]; for (int j = 0; j < coords.length; j++) { diff --git a/processing/src/main/java/io/druid/segment/incremental/InternUtil.java b/processing/src/main/java/io/druid/segment/incremental/InternUtil.java new file mode 100644 index 00000000000..03304dcdce8 --- /dev/null +++ b/processing/src/main/java/io/druid/segment/incremental/InternUtil.java @@ -0,0 +1,45 @@ + +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.segment.incremental; + +import java.lang.ref.WeakReference; +import java.util.WeakHashMap; + + +public class InternUtil +{ + + private static final WeakHashMap> cache = + new WeakHashMap>(100000); + + public static String intern(final String str) + { + final WeakReference cached = cache.get(str); + if (cached != null) { + final String value = cached.get(); + if (value != null) { + return value; + } + } + cache.put(str, new WeakReference(str)); + return str; + } +} From 443e5788fb0cdd1cf79680eb219a954cfa6c2b94 Mon Sep 17 00:00:00 2001 From: nishantmonu51 Date: Mon, 22 Sep 2014 19:26:10 +0530 Subject: [PATCH 08/12] make OffheapIncrementalIndex tuneable --- .../io/druid/indexer/IndexGeneratorJob.java | 3 +- .../query/groupby/GroupByQueryHelper.java | 4 +- .../segment/incremental/IncrementalIndex.java | 373 +++++++++++------- .../IncrementalIndexStorageAdapter.java | 9 +- .../druid/segment/incremental/InternUtil.java | 45 --- .../io/druid/query/QueryRunnerTestHelper.java | 13 +- .../query/metadata/SegmentAnalyzerTest.java | 2 +- .../test/java/io/druid/segment/TestIndex.java | 22 +- .../filter/SpatialFilterBonusTest.java | 14 +- .../segment/filter/SpatialFilterTest.java | 12 +- .../druid/segment/realtime/plumber/Sink.java | 3 +- 11 files changed, 284 insertions(+), 216 deletions(-) delete mode 100644 processing/src/main/java/io/druid/segment/incremental/InternUtil.java diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java b/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java index 1b6878270f4..bf1abde54c7 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java @@ -642,7 +642,8 @@ public class IndexGeneratorJob implements Jobby .withQueryGranularity(config.getSchema().getDataSchema().getGranularitySpec().getQueryGranularity()) .withMetrics(aggs) .build(), - new OffheapBufferPool(bufferSize) + new OffheapBufferPool(bufferSize), + false ); } diff --git a/processing/src/main/java/io/druid/query/groupby/GroupByQueryHelper.java b/processing/src/main/java/io/druid/query/groupby/GroupByQueryHelper.java index aef4ae26098..3cb263b5b4d 100644 --- a/processing/src/main/java/io/druid/query/groupby/GroupByQueryHelper.java +++ b/processing/src/main/java/io/druid/query/groupby/GroupByQueryHelper.java @@ -73,6 +73,7 @@ public class GroupByQueryHelper } } ); + IncrementalIndex index = new IncrementalIndex( // use granularity truncated min timestamp // since incoming truncated timestamps may precede timeStart @@ -80,7 +81,8 @@ public class GroupByQueryHelper gran, aggs.toArray(new AggregatorFactory[aggs.size()]), bufferPool, - false + false, + query.getContextValue("useOffheap", false) ); Accumulator accumulator = new Accumulator() diff --git a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java index 9b0dfc43ed3..75bca9123f9 100644 --- a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java +++ b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java @@ -21,6 +21,8 @@ package io.druid.segment.incremental; import com.google.common.base.Function; import com.google.common.base.Throwables; +import com.google.common.collect.BiMap; +import com.google.common.collect.HashBiMap; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterators; @@ -66,6 +68,7 @@ import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.io.Serializable; +import java.lang.ref.WeakReference; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; @@ -75,8 +78,9 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.UUID; -import java.util.concurrent.ConcurrentMap; +import java.util.WeakHashMap; import java.util.concurrent.ConcurrentNavigableMap; +import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicInteger; @@ -101,6 +105,7 @@ public class IncrementalIndex implements Iterable, Closeable private final ConcurrentNavigableMap facts; private final ResourceHolder bufferHolder; private final DB db; + private final boolean useOffheap; private volatile AtomicInteger numEntries = new AtomicInteger(); // This is modified on add() in a critical section. private ThreadLocal in = new ThreadLocal<>(); @@ -117,7 +122,8 @@ public class IncrementalIndex implements Iterable, Closeable public IncrementalIndex( IncrementalIndexSchema incrementalIndexSchema, StupidPool bufferPool, - final boolean deserializeComplexMetrics + final boolean deserializeComplexMetrics, + final boolean useOffheap ) { this.minTimestamp = incrementalIndexSchema.getMinTimestamp(); @@ -320,18 +326,24 @@ public class IncrementalIndex implements Iterable, Closeable } this.bufferHolder = bufferPool.take(); this.dimValues = new DimensionHolder(); - db = DBMaker.newMemoryDirectDB().transactionDisable().asyncWriteEnable().cacheLRUEnable().cacheSize( - Integer.getInteger( - "cacheSize", - CC.DEFAULT_CACHE_SIZE - ) - ).make(); - final TimeAndDimsSerializer timeAndDimsSerializer = new TimeAndDimsSerializer(this); - this.facts = db.createTreeMap("__facts" + UUID.randomUUID()) - .keySerializer(timeAndDimsSerializer) - .comparator(timeAndDimsSerializer.getComparator()) - .valueSerializer(Serializer.INTEGER) - .make(); + this.useOffheap = useOffheap; + if (useOffheap) { + db = DBMaker.newMemoryDirectDB().transactionDisable().asyncWriteEnable().cacheLRUEnable().cacheSize( + Integer.getInteger( + "cacheSize", + CC.DEFAULT_CACHE_SIZE + ) + ).make(); + final TimeAndDimsSerializer timeAndDimsSerializer = new TimeAndDimsSerializer(this); + this.facts = db.createTreeMap("__facts" + UUID.randomUUID()) + .keySerializer(timeAndDimsSerializer) + .comparator(timeAndDimsSerializer.getComparator()) + .valueSerializer(Serializer.INTEGER) + .make(); + } else { + db = null; + this.facts = new ConcurrentSkipListMap<>(); + } } public IncrementalIndex( @@ -347,16 +359,18 @@ public class IncrementalIndex implements Iterable, Closeable .withMetrics(metrics) .build(), bufferPool, - true + true, + false ); } public IncrementalIndex( IncrementalIndexSchema incrementalIndexSchema, - StupidPool bufferPool + StupidPool bufferPool, + boolean useOffheap ) { - this(incrementalIndexSchema, bufferPool, true); + this(incrementalIndexSchema, bufferPool, true, useOffheap); } public IncrementalIndex( @@ -364,7 +378,8 @@ public class IncrementalIndex implements Iterable, Closeable QueryGranularity gran, final AggregatorFactory[] metrics, StupidPool bufferPool, - boolean deserializeComplexMetrics + boolean deserializeComplexMetrics, + boolean useOffheap ) { this( @@ -373,7 +388,8 @@ public class IncrementalIndex implements Iterable, Closeable .withMetrics(metrics) .build(), bufferPool, - deserializeComplexMetrics + deserializeComplexMetrics, + useOffheap ); } @@ -509,9 +525,11 @@ public class IncrementalIndex implements Iterable, Closeable int count = 0; for (String dimValue : dimValues) { - String canonicalValue = InternUtil.intern(dimValue); - dimLookup.addIfAbsent(canonicalValue); - retVal[count] = canonicalValue; + String canonicalDimValue = dimLookup.getCanonicalValue(dimValue); + if (!dimLookup.contains(canonicalDimValue)) { + dimLookup.add(dimValue); + } + retVal[count] = canonicalDimValue; count++; } Arrays.sort(retVal); @@ -665,31 +683,48 @@ public class IncrementalIndex implements Iterable, Closeable { try { bufferHolder.close(); - db.close(); + if (db != null) { + db.close(); + } } catch (IOException e) { throw Throwables.propagate(e); } } - public static class TimeAndDimsComparator implements Comparator, Serializable + class DimensionHolder { - // mapdb asserts the comparator to be serializable, ugly workaround to satisfy the assert. - private transient final IncrementalIndex incrementalIndex; + private final Map dimensions; - public TimeAndDimsComparator(IncrementalIndex incrementalIndex) + DimensionHolder() { - this.incrementalIndex = incrementalIndex; + dimensions = Maps.newConcurrentMap(); } - @Override - public int compare(Object o1, Object o2) + void reset() { - return ((TimeAndDims) o1).compareTo((io.druid.segment.incremental.IncrementalIndex.TimeAndDims) o2); + dimensions.clear(); + } + + DimDim add(String dimension) + { + DimDim holder = dimensions.get(dimension); + if (holder == null) { + holder = new DimDim(dimension); + dimensions.put(dimension, holder); + } else { + throw new ISE("dimension[%s] already existed even though add() was called!?", dimension); + } + return holder; + } + + DimDim get(String dimension) + { + return dimensions.get(dimension); } } - static class TimeAndDims implements Comparable, Serializable + static class TimeAndDims implements Comparable { private final long timestamp; private final String[][] dims; @@ -772,7 +807,164 @@ public class IncrementalIndex implements Iterable, Closeable ) + '}'; } + } + class DimDim + { + private final Interner interner; + private final Map falseIds; + private final Map falseIdsReverse; + private volatile String[] sortedVals = null; + // size on MapDB is slow so maintain a count here + private volatile int size = 0; + + public DimDim(String dimName) + { + if (useOffheap) { + falseIds = db.createHashMap(dimName) + .keySerializer(Serializer.STRING) + .valueSerializer(Serializer.INTEGER) + .make(); + falseIdsReverse = db.createHashMap(dimName + "_inverse") + .keySerializer(Serializer.INTEGER) + .valueSerializer(Serializer.STRING) + .make(); + interner = new WeakInterner(); + } else { + BiMap biMap = Maps.synchronizedBiMap(HashBiMap.create()); + falseIds = biMap; + falseIdsReverse = biMap.inverse(); + interner = new StrongInterner(); + } + } + + public String getCanonicalValue(String value) + { + return interner.getCanonicalValue(value); + } + + public int getId(String value) + { + return falseIds.get(value); + } + + public String getValue(int id) + { + return falseIdsReverse.get(id); + } + + public boolean contains(String value) + { + return falseIds.containsKey(value); + } + + public int size() + { + return size; + } + + public synchronized int add(String value) + { + int id = size++; + falseIds.put(value, id); + if (!useOffheap) { + // onheap implementation uses a Bimap. + falseIdsReverse.put(id, value); + } + return id; + } + + public int getSortedId(String value) + { + assertSorted(); + return Arrays.binarySearch(sortedVals, value); + } + + public String getSortedValue(int index) + { + assertSorted(); + return sortedVals[index]; + } + + public void sort() + { + if (sortedVals == null) { + sortedVals = new String[falseIds.size()]; + + int index = 0; + for (String value : falseIds.keySet()) { + sortedVals[index++] = value; + } + Arrays.sort(sortedVals); + } + } + + private void assertSorted() + { + if (sortedVals == null) { + throw new ISE("Call sort() before calling the getSorted* methods."); + } + } + + public boolean compareCannonicalValues(String s1, String s2) + { + return interner.compareCanonicalValues(s1, s2); + } + } + + private static interface Interner + { + public String getCanonicalValue(String str); + + public boolean compareCanonicalValues(String s1, String s2); + } + + private static class WeakInterner implements Interner + { + private static final WeakHashMap> cache = + new WeakHashMap(); + + @Override + public String getCanonicalValue(String str) + { + final WeakReference cached = cache.get(str); + if (cached != null) { + final String value = cached.get(); + if (value != null) { + return value; + } + } + cache.put(str, new WeakReference(str)); + return str; + } + + @Override + public boolean compareCanonicalValues(String s1, String s2) + { + return s1.equals(s2); + } + } + + private static class StrongInterner implements Interner + { + final Map poorMansInterning = Maps.newConcurrentMap(); + + @Override + public String getCanonicalValue(String str) + { + String value = poorMansInterning.get(str); + if (value != null) { + return value; + } + poorMansInterning.put(str, str); + return str; + } + + @Override + public boolean compareCanonicalValues(String s1, String s2) + { + return s1 == s2; + } } public static class TimeAndDimsSerializer extends BTreeKeySerializer implements Serializable @@ -782,7 +974,7 @@ public class IncrementalIndex implements Iterable, Closeable TimeAndDimsSerializer(IncrementalIndex incrementalIndex) { - this.comparator = new TimeAndDimsComparator(incrementalIndex); + this.comparator = new TimeAndDimsComparator(); this.incrementalIndex = incrementalIndex; } @@ -822,7 +1014,7 @@ public class IncrementalIndex implements Iterable, Closeable DimDim dimDim = incrementalIndex.getDimension(incrementalIndex.dimensions.get(k)); String[] col = new String[len]; for (int l = 0; l < col.length; l++) { - col[l] = InternUtil.intern(dimDim.getValue(in.readInt())); + col[l] = dimDim.getCanonicalValue(dimDim.getValue(in.readInt())); } dims[k] = col; } @@ -839,117 +1031,12 @@ public class IncrementalIndex implements Iterable, Closeable } } - class DimensionHolder + public static class TimeAndDimsComparator implements Comparator, Serializable { - private final Map dimensions; - - DimensionHolder() + @Override + public int compare(Object o1, Object o2) { - dimensions = Maps.newConcurrentMap(); - } - - void reset() - { - dimensions.clear(); - } - - DimDim add(String dimension) - { - DimDim holder = dimensions.get(dimension); - if (holder == null) { - holder = new DimDim(dimension); - dimensions.put(dimension, holder); - } else { - throw new ISE("dimension[%s] already existed even though add() was called!?", dimension); - } - return holder; - } - - DimDim get(String dimension) - { - return dimensions.get(dimension); - } - } - - class DimDim - { - private final ConcurrentMap falseIds; - private final Map falseIdsReverse; - private volatile String[] sortedVals = null; - // size on MapDB is slow so maintain a count here - private volatile int size = 0; - - public DimDim(String dimName) - { - falseIds = db.createHashMap(dimName).keySerializer(Serializer.STRING).valueSerializer(Serializer.INTEGER).make(); - falseIdsReverse = db.createHashMap(dimName + "_inverse") - .keySerializer(Serializer.INTEGER) - .valueSerializer(Serializer.STRING) - .make(); - } - - public int getId(String value) - { - Integer id = falseIds.get(value); - return id == null ? -1 : id; - } - - public String getValue(int id) - { - return falseIdsReverse.get(id); - } - - public boolean contains(String value) - { - return falseIds.containsKey(value); - } - - public int size() - { - return size; - } - - public synchronized int addIfAbsent(String value) - { - Integer id = falseIds.putIfAbsent(value, size); - if (id == null) { - falseIdsReverse.put(size, value); - id = size; - size++; - } - return id; - } - - public int getSortedId(String value) - { - assertSorted(); - return Arrays.binarySearch(sortedVals, value); - } - - public String getSortedValue(int index) - { - assertSorted(); - return sortedVals[index]; - } - - public void sort() - { - if (sortedVals == null) { - sortedVals = new String[falseIds.size()]; - - int index = 0; - for (String value : falseIds.keySet()) { - sortedVals[index++] = value; - } - Arrays.sort(sortedVals); - } - } - - private void assertSorted() - { - if (sortedVals == null) { - throw new ISE("Call sort() before calling the getSorted* methods."); - } + return ((TimeAndDims) o1).compareTo((TimeAndDims) o2); } } } diff --git a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexStorageAdapter.java b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexStorageAdapter.java index 5210c04e1d5..74d9309152e 100644 --- a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexStorageAdapter.java +++ b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexStorageAdapter.java @@ -488,14 +488,14 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter } @Override - public ValueMatcher makeValueMatcher(final String dimension, final String value) + public ValueMatcher makeValueMatcher(String dimension, String value) { Integer dimIndexObject = index.getDimensionIndex(dimension.toLowerCase()); if (dimIndexObject == null) { return new BooleanValueMatcher(false); } - - if (!index.getDimension(dimension.toLowerCase()).contains(value)) { + final IncrementalIndex.DimDim dimDim = index.getDimension(dimension.toLowerCase()); + if (!dimDim.contains(value)) { if (value == null || "".equals(value)) { final int dimIndex = dimIndexObject; @@ -516,6 +516,7 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter } final int dimIndex = dimIndexObject; + final String id = dimDim.getCanonicalValue(value); return new ValueMatcher() { @@ -528,7 +529,7 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter } for (String dimVal : dims[dimIndex]) { - if (value.equals(dimVal)) { + if (dimDim.compareCannonicalValues(id,dimVal)) { return true; } } diff --git a/processing/src/main/java/io/druid/segment/incremental/InternUtil.java b/processing/src/main/java/io/druid/segment/incremental/InternUtil.java deleted file mode 100644 index 03304dcdce8..00000000000 --- a/processing/src/main/java/io/druid/segment/incremental/InternUtil.java +++ /dev/null @@ -1,45 +0,0 @@ - -/* - * Druid - a distributed column store. - * Copyright (C) 2012, 2013 Metamarkets Group Inc. - * - * This program is free software; you can redistribute it and/or - * modify it under the terms of the GNU General Public License - * as published by the Free Software Foundation; either version 2 - * of the License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the Free Software - * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. - */ - -package io.druid.segment.incremental; - -import java.lang.ref.WeakReference; -import java.util.WeakHashMap; - - -public class InternUtil -{ - - private static final WeakHashMap> cache = - new WeakHashMap>(100000); - - public static String intern(final String str) - { - final WeakReference cached = cache.get(str); - if (cached != null) { - final String value = cached.get(); - if (value != null) { - return value; - } - } - cache.put(str, new WeakReference(str)); - return str; - } -} diff --git a/processing/src/test/java/io/druid/query/QueryRunnerTestHelper.java b/processing/src/test/java/io/druid/query/QueryRunnerTestHelper.java index deeaff563e8..b47f78cefb1 100644 --- a/processing/src/test/java/io/druid/query/QueryRunnerTestHelper.java +++ b/processing/src/test/java/io/druid/query/QueryRunnerTestHelper.java @@ -194,9 +194,10 @@ public class QueryRunnerTestHelper ) throws IOException { - final IncrementalIndex rtIndex = TestIndex.getIncrementalTestIndex(); + final IncrementalIndex rtIndex = TestIndex.getIncrementalTestIndex(false); final QueryableIndex mMappedTestIndex = TestIndex.getMMappedTestIndex(); final QueryableIndex mergedRealtimeIndex = TestIndex.mergedRealtimeIndex(); + final IncrementalIndex rtIndexOffheap = TestIndex.getIncrementalTestIndex(true); return Arrays.asList( new Object[][]{ { @@ -207,6 +208,9 @@ public class QueryRunnerTestHelper }, { makeQueryRunner(factory, new QueryableIndexSegment(segmentId, mergedRealtimeIndex)) + }, + { + makeQueryRunner(factory, new IncrementalIndexSegment(rtIndexOffheap, segmentId)) } } ); @@ -218,9 +222,11 @@ public class QueryRunnerTestHelper ) throws IOException { - final IncrementalIndex rtIndex = TestIndex.getIncrementalTestIndex(); + final IncrementalIndex rtIndex = TestIndex.getIncrementalTestIndex(false); final QueryableIndex mMappedTestIndex = TestIndex.getMMappedTestIndex(); final QueryableIndex mergedRealtimeIndex = TestIndex.mergedRealtimeIndex(); + final IncrementalIndex rtIndexOffheap = TestIndex.getIncrementalTestIndex(true); + return Arrays.asList( new Object[][]{ { @@ -231,6 +237,9 @@ public class QueryRunnerTestHelper }, { makeUnionQueryRunner(factory, new QueryableIndexSegment(segmentId, mergedRealtimeIndex)) + }, + { + makeUnionQueryRunner(factory, new IncrementalIndexSegment(rtIndexOffheap, segmentId)) } } ); diff --git a/processing/src/test/java/io/druid/query/metadata/SegmentAnalyzerTest.java b/processing/src/test/java/io/druid/query/metadata/SegmentAnalyzerTest.java index d22ba345a0e..503be5e5f6f 100644 --- a/processing/src/test/java/io/druid/query/metadata/SegmentAnalyzerTest.java +++ b/processing/src/test/java/io/druid/query/metadata/SegmentAnalyzerTest.java @@ -50,7 +50,7 @@ public class SegmentAnalyzerTest public void testIncrementalDoesNotWork() throws Exception { final List results = getSegmentAnalysises( - new IncrementalIndexSegment(TestIndex.getIncrementalTestIndex(), null) + new IncrementalIndexSegment(TestIndex.getIncrementalTestIndex(false), null) ); Assert.assertEquals(0, results.size()); diff --git a/processing/src/test/java/io/druid/segment/TestIndex.java b/processing/src/test/java/io/druid/segment/TestIndex.java index 4244c5e38ac..8d0d235c054 100644 --- a/processing/src/test/java/io/druid/segment/TestIndex.java +++ b/processing/src/test/java/io/druid/segment/TestIndex.java @@ -38,6 +38,7 @@ import io.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory; import io.druid.query.aggregation.hyperloglog.HyperUniquesSerde; import io.druid.segment.column.ColumnConfig; import io.druid.segment.incremental.IncrementalIndex; +import io.druid.segment.incremental.IncrementalIndexSchema; import io.druid.segment.serde.ComplexMetrics; import org.joda.time.DateTime; import org.joda.time.Interval; @@ -82,7 +83,7 @@ public class TestIndex } } - public static IncrementalIndex getIncrementalTestIndex() + public static IncrementalIndex getIncrementalTestIndex(boolean useOffheap) { synchronized (log) { if (realtimeIndex != null) { @@ -90,7 +91,7 @@ public class TestIndex } } - return realtimeIndex = makeRealtimeIndex("druid.sample.tsv"); + return realtimeIndex = makeRealtimeIndex("druid.sample.tsv", useOffheap); } public static QueryableIndex getMMappedTestIndex() @@ -101,7 +102,7 @@ public class TestIndex } } - IncrementalIndex incrementalIndex = getIncrementalTestIndex(); + IncrementalIndex incrementalIndex = getIncrementalTestIndex(false); mmappedIndex = persistRealtimeAndLoadMMapped(incrementalIndex); return mmappedIndex; @@ -115,8 +116,8 @@ public class TestIndex } try { - IncrementalIndex top = makeRealtimeIndex("druid.sample.tsv.top"); - IncrementalIndex bottom = makeRealtimeIndex("druid.sample.tsv.bottom"); + IncrementalIndex top = makeRealtimeIndex("druid.sample.tsv.top", false); + IncrementalIndex bottom = makeRealtimeIndex("druid.sample.tsv.bottom", false); File tmpFile = File.createTempFile("yay", "who"); tmpFile.delete(); @@ -151,14 +152,19 @@ public class TestIndex } } - private static IncrementalIndex makeRealtimeIndex(final String resourceFilename) + private static IncrementalIndex makeRealtimeIndex(final String resourceFilename, final boolean useOffheap) { final URL resource = TestIndex.class.getClassLoader().getResource(resourceFilename); log.info("Realtime loading index file[%s]", resource); final IncrementalIndex retVal = new IncrementalIndex( - new DateTime("2011-01-12T00:00:00.000Z").getMillis(), QueryGranularity.NONE, METRIC_AGGS, - TestQueryRunners.pool + new IncrementalIndexSchema.Builder().withMinTimestamp(new DateTime("2011-01-12T00:00:00.000Z").getMillis()) + .withQueryGranularity(QueryGranularity.NONE) + .withMetrics(METRIC_AGGS) + .build(), + TestQueryRunners.pool, + true, + useOffheap ); final AtomicLong startTime = new AtomicLong(); diff --git a/processing/src/test/java/io/druid/segment/filter/SpatialFilterBonusTest.java b/processing/src/test/java/io/druid/segment/filter/SpatialFilterBonusTest.java index 12c3ca633f1..4a91f8db3b3 100644 --- a/processing/src/test/java/io/druid/segment/filter/SpatialFilterBonusTest.java +++ b/processing/src/test/java/io/druid/segment/filter/SpatialFilterBonusTest.java @@ -125,7 +125,8 @@ public class SpatialFilterBonusTest ) ) ).build(), - TestQueryRunners.pool + TestQueryRunners.pool, + false ); theIndex.add( new MapBasedInputRow( @@ -256,7 +257,8 @@ public class SpatialFilterBonusTest ) ).build(), - TestQueryRunners.pool + TestQueryRunners.pool, + false ); IncrementalIndex second = new IncrementalIndex( new IncrementalIndexSchema.Builder().withMinTimestamp(DATA_INTERVAL.getStartMillis()) @@ -274,8 +276,8 @@ public class SpatialFilterBonusTest ) ) ).build(), - TestQueryRunners.pool - + TestQueryRunners.pool, + false ); IncrementalIndex third = new IncrementalIndex( new IncrementalIndexSchema.Builder().withMinTimestamp(DATA_INTERVAL.getStartMillis()) @@ -294,8 +296,8 @@ public class SpatialFilterBonusTest ) ).build(), - TestQueryRunners.pool - + TestQueryRunners.pool, + false ); diff --git a/processing/src/test/java/io/druid/segment/filter/SpatialFilterTest.java b/processing/src/test/java/io/druid/segment/filter/SpatialFilterTest.java index f9693bdabf8..d9b5b528d2a 100644 --- a/processing/src/test/java/io/druid/segment/filter/SpatialFilterTest.java +++ b/processing/src/test/java/io/druid/segment/filter/SpatialFilterTest.java @@ -125,7 +125,8 @@ public class SpatialFilterTest ) ) ).build(), - TestQueryRunners.pool + TestQueryRunners.pool, + false ); theIndex.add( new MapBasedInputRow( @@ -270,7 +271,8 @@ public class SpatialFilterTest ) ) ).build(), - TestQueryRunners.pool + TestQueryRunners.pool, + false ); IncrementalIndex second = new IncrementalIndex( new IncrementalIndexSchema.Builder().withMinTimestamp(DATA_INTERVAL.getStartMillis()) @@ -289,7 +291,8 @@ public class SpatialFilterTest ) ).build(), - TestQueryRunners.pool + TestQueryRunners.pool, + false ); IncrementalIndex third = new IncrementalIndex( new IncrementalIndexSchema.Builder().withMinTimestamp(DATA_INTERVAL.getStartMillis()) @@ -308,7 +311,8 @@ public class SpatialFilterTest ) ).build(), - TestQueryRunners.pool + TestQueryRunners.pool, + false ); diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/Sink.java b/server/src/main/java/io/druid/segment/realtime/plumber/Sink.java index 6a557b44822..dceeb5dd49e 100644 --- a/server/src/main/java/io/druid/segment/realtime/plumber/Sink.java +++ b/server/src/main/java/io/druid/segment/realtime/plumber/Sink.java @@ -188,7 +188,8 @@ public class Sink implements Iterable .withDimensionsSpec(schema.getParser()) .withMetrics(schema.getAggregators()) .build(), - new OffheapBufferPool(bufferSize) + new OffheapBufferPool(bufferSize), + false ); FireHydrant old; From 3f66d3c1678d5cf8f9bb62197b5ada43eb03dc44 Mon Sep 17 00:00:00 2001 From: nishantmonu51 Date: Mon, 29 Sep 2014 21:49:12 +0530 Subject: [PATCH 09/12] review comments + mapdb deadlock fix --- .../segment/incremental/IncrementalIndex.java | 27 +++++++++---------- .../IncrementalIndexStorageAdapter.java | 2 +- 2 files changed, 14 insertions(+), 15 deletions(-) diff --git a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java index 75bca9123f9..e0f47513c05 100644 --- a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java +++ b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java @@ -57,7 +57,6 @@ import io.druid.segment.serde.ComplexMetrics; import org.joda.time.DateTime; import org.joda.time.Interval; import org.mapdb.BTreeKeySerializer; -import org.mapdb.CC; import org.mapdb.DB; import org.mapdb.DBMaker; import org.mapdb.Serializer; @@ -105,6 +104,7 @@ public class IncrementalIndex implements Iterable, Closeable private final ConcurrentNavigableMap facts; private final ResourceHolder bufferHolder; private final DB db; + private final DB factsDb; private final boolean useOffheap; private volatile AtomicInteger numEntries = new AtomicInteger(); // This is modified on add() in a critical section. @@ -326,22 +326,20 @@ public class IncrementalIndex implements Iterable, Closeable } this.bufferHolder = bufferPool.take(); this.dimValues = new DimensionHolder(); - this.useOffheap = useOffheap; - if (useOffheap) { - db = DBMaker.newMemoryDirectDB().transactionDisable().asyncWriteEnable().cacheLRUEnable().cacheSize( - Integer.getInteger( - "cacheSize", - CC.DEFAULT_CACHE_SIZE - ) - ).make(); + this.useOffheap = true; + if (this.useOffheap) { + final DBMaker dbMaker = DBMaker.newMemoryDirectDB().transactionDisable().asyncWriteEnable().cacheSoftRefEnable(); + db = dbMaker.make(); + factsDb = dbMaker.make(); final TimeAndDimsSerializer timeAndDimsSerializer = new TimeAndDimsSerializer(this); - this.facts = db.createTreeMap("__facts" + UUID.randomUUID()) + this.facts = factsDb.createTreeMap("__facts" + UUID.randomUUID()) .keySerializer(timeAndDimsSerializer) .comparator(timeAndDimsSerializer.getComparator()) .valueSerializer(Serializer.INTEGER) .make(); } else { db = null; + factsDb = null; this.facts = new ConcurrentSkipListMap<>(); } } @@ -525,7 +523,7 @@ public class IncrementalIndex implements Iterable, Closeable int count = 0; for (String dimValue : dimValues) { - String canonicalDimValue = dimLookup.getCanonicalValue(dimValue); + String canonicalDimValue = dimLookup.get(dimValue); if (!dimLookup.contains(canonicalDimValue)) { dimLookup.add(dimValue); } @@ -684,6 +682,7 @@ public class IncrementalIndex implements Iterable, Closeable try { bufferHolder.close(); if (db != null) { + factsDb.close(); db.close(); } } @@ -838,7 +837,7 @@ public class IncrementalIndex implements Iterable, Closeable } } - public String getCanonicalValue(String value) + public String get(String value) { return interner.getCanonicalValue(value); } @@ -867,7 +866,7 @@ public class IncrementalIndex implements Iterable, Closeable { int id = size++; falseIds.put(value, id); - if (!useOffheap) { + if (useOffheap) { // onheap implementation uses a Bimap. falseIdsReverse.put(id, value); } @@ -1014,7 +1013,7 @@ public class IncrementalIndex implements Iterable, Closeable DimDim dimDim = incrementalIndex.getDimension(incrementalIndex.dimensions.get(k)); String[] col = new String[len]; for (int l = 0; l < col.length; l++) { - col[l] = dimDim.getCanonicalValue(dimDim.getValue(in.readInt())); + col[l] = dimDim.get(dimDim.getValue(in.readInt())); } dims[k] = col; } diff --git a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexStorageAdapter.java b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexStorageAdapter.java index f867702f3a4..f5b1d8095b5 100644 --- a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexStorageAdapter.java +++ b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexStorageAdapter.java @@ -519,7 +519,7 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter } final int dimIndex = dimIndexObject; - final String id = dimDim.getCanonicalValue(value); + final String id = dimDim.get(value); return new ValueMatcher() { From 61c7fd2e6e73d7f993e5e0f82a3b59db09d9579b Mon Sep 17 00:00:00 2001 From: nishantmonu51 Date: Tue, 30 Sep 2014 15:30:02 +0530 Subject: [PATCH 10/12] make ingestOffheap tuneable --- .../io/druid/indexer/HadoopIngestionSpec.java | 1 + .../io/druid/indexer/HadoopTuningConfig.java | 20 ++++++++++++++---- .../io/druid/indexer/IndexGeneratorJob.java | 5 +++-- .../druid/indexing/common/task/IndexTask.java | 2 +- .../common/task/RealtimeIndexTask.java | 1 + .../segment/incremental/IncrementalIndex.java | 16 ++++++++------ .../indexing/RealtimeTuningConfig.java | 21 +++++++++++++++---- .../segment/realtime/FireDepartment.java | 1 + .../druid/segment/realtime/plumber/Sink.java | 2 +- .../segment/realtime/FireDepartmentTest.java | 2 +- .../segment/realtime/RealtimeManagerTest.java | 1 + .../plumber/RealtimePlumberSchoolTest.java | 1 + .../segment/realtime/plumber/SinkTest.java | 1 + 13 files changed, 55 insertions(+), 19 deletions(-) diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopIngestionSpec.java b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopIngestionSpec.java index 028f97d7db5..fec163e4164 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopIngestionSpec.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopIngestionSpec.java @@ -166,6 +166,7 @@ public class HadoopIngestionSpec extends IngestionSpec jobProperties; private final boolean combineText; private final boolean persistInHeap; + private final boolean ingestOffheap; @JsonCreator public HadoopTuningConfig( @@ -84,7 +86,8 @@ public class HadoopTuningConfig implements TuningConfig final @JsonProperty("ignoreInvalidRows") boolean ignoreInvalidRows, final @JsonProperty("jobProperties") Map jobProperties, final @JsonProperty("combineText") boolean combineText, - final @JsonProperty("persistInHeap") boolean persistInHeap + final @JsonProperty("persistInHeap") boolean persistInHeap, + final @JsonProperty("ingestOffheap") boolean ingestOffheap ) { this.workingPath = workingPath == null ? null : workingPath; @@ -101,6 +104,7 @@ public class HadoopTuningConfig implements TuningConfig : ImmutableMap.copyOf(jobProperties)); this.combineText = combineText; this.persistInHeap = persistInHeap; + this.ingestOffheap = ingestOffheap; } @JsonProperty @@ -175,6 +179,11 @@ public class HadoopTuningConfig implements TuningConfig return persistInHeap; } + @JsonProperty + public boolean isIngestOffheap(){ + return ingestOffheap; + } + public HadoopTuningConfig withWorkingPath(String path) { return new HadoopTuningConfig( @@ -189,7 +198,8 @@ public class HadoopTuningConfig implements TuningConfig ignoreInvalidRows, jobProperties, combineText, - persistInHeap + persistInHeap, + ingestOffheap ); } @@ -207,7 +217,8 @@ public class HadoopTuningConfig implements TuningConfig ignoreInvalidRows, jobProperties, combineText, - persistInHeap + persistInHeap, + ingestOffheap ); } @@ -225,7 +236,8 @@ public class HadoopTuningConfig implements TuningConfig ignoreInvalidRows, jobProperties, combineText, - persistInHeap + persistInHeap, + ingestOffheap ); } } diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java b/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java index bf1abde54c7..74ac61e637c 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java @@ -634,7 +634,8 @@ public class IndexGeneratorJob implements Jobby for (AggregatorFactory agg : aggs) { aggsSize += agg.getMaxIntermediateSize(); } - int bufferSize = aggsSize * config.getSchema().getTuningConfig().getRowFlushBoundary(); + final HadoopTuningConfig tuningConfig = config.getSchema().getTuningConfig(); + int bufferSize = aggsSize * tuningConfig.getRowFlushBoundary(); return new IncrementalIndex( new IncrementalIndexSchema.Builder() .withMinTimestamp(theBucket.time.getMillis()) @@ -643,7 +644,7 @@ public class IndexGeneratorJob implements Jobby .withMetrics(aggs) .build(), new OffheapBufferPool(bufferSize), - false + tuningConfig.isIngestOffheap() ); } diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java index c477cb5d153..058b3fd027e 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java @@ -403,7 +403,7 @@ public class IndexTask extends AbstractFixedIntervalTask tmpDir ).findPlumber( schema, - new RealtimeTuningConfig(null, null, null, null, null, null, null, shardSpec, null), + new RealtimeTuningConfig(null, null, null, null, null, null, null, shardSpec, null, null), metrics ); diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java index d073d88dc32..906e8e7a901 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java @@ -144,6 +144,7 @@ public class RealtimeIndexTask extends AbstractTask rejectionPolicy == null ? rejectionPolicyFactory : rejectionPolicy, maxPendingPersists, spec.getShardSpec(), + false, false ), null, null, null, null diff --git a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java index e0f47513c05..329e91f02ea 100644 --- a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java +++ b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java @@ -57,6 +57,7 @@ import io.druid.segment.serde.ComplexMetrics; import org.joda.time.DateTime; import org.joda.time.Interval; import org.mapdb.BTreeKeySerializer; +import org.mapdb.CC; import org.mapdb.DB; import org.mapdb.DBMaker; import org.mapdb.Serializer; @@ -326,17 +327,20 @@ public class IncrementalIndex implements Iterable, Closeable } this.bufferHolder = bufferPool.take(); this.dimValues = new DimensionHolder(); - this.useOffheap = true; + this.useOffheap = useOffheap; if (this.useOffheap) { - final DBMaker dbMaker = DBMaker.newMemoryDirectDB().transactionDisable().asyncWriteEnable().cacheSoftRefEnable(); + final DBMaker dbMaker = DBMaker.newMemoryDirectDB() + .transactionDisable() + .asyncWriteEnable() + .cacheSoftRefEnable(); db = dbMaker.make(); factsDb = dbMaker.make(); final TimeAndDimsSerializer timeAndDimsSerializer = new TimeAndDimsSerializer(this); this.facts = factsDb.createTreeMap("__facts" + UUID.randomUUID()) - .keySerializer(timeAndDimsSerializer) - .comparator(timeAndDimsSerializer.getComparator()) - .valueSerializer(Serializer.INTEGER) - .make(); + .keySerializer(timeAndDimsSerializer) + .comparator(timeAndDimsSerializer.getComparator()) + .valueSerializer(Serializer.INTEGER) + .make(); } else { db = null; factsDb = null; diff --git a/server/src/main/java/io/druid/segment/indexing/RealtimeTuningConfig.java b/server/src/main/java/io/druid/segment/indexing/RealtimeTuningConfig.java index a4d60bfe77a..fa0c37b2cfd 100644 --- a/server/src/main/java/io/druid/segment/indexing/RealtimeTuningConfig.java +++ b/server/src/main/java/io/druid/segment/indexing/RealtimeTuningConfig.java @@ -45,6 +45,8 @@ public class RealtimeTuningConfig implements TuningConfig private static final int defaultMaxPendingPersists = 0; private static final ShardSpec defaultShardSpec = new NoneShardSpec(); private static final boolean defaultPersistInHeap = false; + private static final boolean defaultIngestOffheap = false; + // Might make sense for this to be a builder public static RealtimeTuningConfig makeDefaultTuningConfig() @@ -58,7 +60,8 @@ public class RealtimeTuningConfig implements TuningConfig defaultRejectionPolicyFactory, defaultMaxPendingPersists, defaultShardSpec, - defaultPersistInHeap + defaultPersistInHeap, + defaultIngestOffheap ); } @@ -71,6 +74,7 @@ public class RealtimeTuningConfig implements TuningConfig private final int maxPendingPersists; private final ShardSpec shardSpec; private final boolean persistInHeap; + private final boolean ingestOffheap; @JsonCreator public RealtimeTuningConfig( @@ -82,7 +86,8 @@ public class RealtimeTuningConfig implements TuningConfig @JsonProperty("rejectionPolicy") RejectionPolicyFactory rejectionPolicyFactory, @JsonProperty("maxPendingPersists") Integer maxPendingPersists, @JsonProperty("shardSpec") ShardSpec shardSpec, - @JsonProperty("persistInHeap") Boolean persistInHeap + @JsonProperty("persistInHeap") Boolean persistInHeap, + @JsonProperty("ingestOffheap") Boolean ingestOffheap ) { this.maxRowsInMemory = maxRowsInMemory == null ? defaultMaxRowsInMemory : maxRowsInMemory; @@ -98,6 +103,7 @@ public class RealtimeTuningConfig implements TuningConfig this.maxPendingPersists = maxPendingPersists == null ? defaultMaxPendingPersists : maxPendingPersists; this.shardSpec = shardSpec == null ? defaultShardSpec : shardSpec; this.persistInHeap = persistInHeap == null ? defaultPersistInHeap : persistInHeap; + this.ingestOffheap = ingestOffheap == null ? defaultIngestOffheap : ingestOffheap; } @JsonProperty @@ -154,6 +160,11 @@ public class RealtimeTuningConfig implements TuningConfig return persistInHeap; } + @JsonProperty + public boolean isIngestOffheap(){ + return ingestOffheap; + } + public RealtimeTuningConfig withVersioningPolicy(VersioningPolicy policy) { return new RealtimeTuningConfig( @@ -165,7 +176,8 @@ public class RealtimeTuningConfig implements TuningConfig rejectionPolicyFactory, maxPendingPersists, shardSpec, - persistInHeap + persistInHeap, + ingestOffheap ); } @@ -180,7 +192,8 @@ public class RealtimeTuningConfig implements TuningConfig rejectionPolicyFactory, maxPendingPersists, shardSpec, - persistInHeap + persistInHeap, + ingestOffheap ); } } diff --git a/server/src/main/java/io/druid/segment/realtime/FireDepartment.java b/server/src/main/java/io/druid/segment/realtime/FireDepartment.java index 6851d693af5..3e2211f51fd 100644 --- a/server/src/main/java/io/druid/segment/realtime/FireDepartment.java +++ b/server/src/main/java/io/druid/segment/realtime/FireDepartment.java @@ -97,6 +97,7 @@ public class FireDepartment extends IngestionSpec .withMetrics(schema.getAggregators()) .build(), new OffheapBufferPool(bufferSize), - false + config.isIngestOffheap() ); FireHydrant old; diff --git a/server/src/test/java/io/druid/segment/realtime/FireDepartmentTest.java b/server/src/test/java/io/druid/segment/realtime/FireDepartmentTest.java index 4f9cb51ecc9..549574b238e 100644 --- a/server/src/test/java/io/druid/segment/realtime/FireDepartmentTest.java +++ b/server/src/test/java/io/druid/segment/realtime/FireDepartmentTest.java @@ -77,7 +77,7 @@ public class FireDepartmentTest ) ), new RealtimeTuningConfig( - null, null, null, null, null, null, null, null, false + null, null, null, null, null, null, null, null, false, false ), null, null, null, null ); diff --git a/server/src/test/java/io/druid/segment/realtime/RealtimeManagerTest.java b/server/src/test/java/io/druid/segment/realtime/RealtimeManagerTest.java index d8c86386b8e..b6619b35f38 100644 --- a/server/src/test/java/io/druid/segment/realtime/RealtimeManagerTest.java +++ b/server/src/test/java/io/druid/segment/realtime/RealtimeManagerTest.java @@ -117,6 +117,7 @@ public class RealtimeManagerTest null, null, null, + null, null ); plumber = new TestPlumber(new Sink(new Interval("0/P5000Y"), schema, tuningConfig, new DateTime().toString())); diff --git a/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java b/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java index c120d31451e..c037d5b2b13 100644 --- a/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java +++ b/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java @@ -163,6 +163,7 @@ public class RealtimePlumberSchoolTest rejectionPolicy, null, null, + null, null ); diff --git a/server/src/test/java/io/druid/segment/realtime/plumber/SinkTest.java b/server/src/test/java/io/druid/segment/realtime/plumber/SinkTest.java index 227f753b114..8fed5962f54 100644 --- a/server/src/test/java/io/druid/segment/realtime/plumber/SinkTest.java +++ b/server/src/test/java/io/druid/segment/realtime/plumber/SinkTest.java @@ -64,6 +64,7 @@ public class SinkTest null, null, null, + false, false ); final Sink sink = new Sink(interval, schema, tuningConfig, version); From ad75a21040def8a7a70a63ef0cf41a2bb203043d Mon Sep 17 00:00:00 2001 From: nishantmonu51 Date: Wed, 1 Oct 2014 13:58:51 +0530 Subject: [PATCH 11/12] separate offheapIncrementalIndex implementation --- .../io/druid/indexer/IndexGeneratorJob.java | 28 +- .../query/groupby/GroupByQueryHelper.java | 20 +- .../segment/incremental/IncrementalIndex.java | 259 ++++-------------- .../incremental/OffheapIncrementalIndex.java | 255 +++++++++++++++++ .../test/java/io/druid/segment/TestIndex.java | 39 +-- .../druid/segment/realtime/plumber/Sink.java | 29 +- 6 files changed, 381 insertions(+), 249 deletions(-) create mode 100644 processing/src/main/java/io/druid/segment/incremental/OffheapIncrementalIndex.java diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java b/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java index 74ac61e637c..5f33d677014 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java @@ -45,6 +45,7 @@ import io.druid.segment.QueryableIndex; import io.druid.segment.SegmentUtils; import io.druid.segment.incremental.IncrementalIndex; import io.druid.segment.incremental.IncrementalIndexSchema; +import io.druid.segment.incremental.OffheapIncrementalIndex; import io.druid.timeline.DataSegment; import org.apache.commons.io.FileUtils; import org.apache.hadoop.conf.Configurable; @@ -636,16 +637,23 @@ public class IndexGeneratorJob implements Jobby } final HadoopTuningConfig tuningConfig = config.getSchema().getTuningConfig(); int bufferSize = aggsSize * tuningConfig.getRowFlushBoundary(); - return new IncrementalIndex( - new IncrementalIndexSchema.Builder() - .withMinTimestamp(theBucket.time.getMillis()) - .withDimensionsSpec(config.getSchema().getDataSchema().getParser()) - .withQueryGranularity(config.getSchema().getDataSchema().getGranularitySpec().getQueryGranularity()) - .withMetrics(aggs) - .build(), - new OffheapBufferPool(bufferSize), - tuningConfig.isIngestOffheap() - ); + final IncrementalIndexSchema indexSchema = new IncrementalIndexSchema.Builder() + .withMinTimestamp(theBucket.time.getMillis()) + .withDimensionsSpec(config.getSchema().getDataSchema().getParser()) + .withQueryGranularity(config.getSchema().getDataSchema().getGranularitySpec().getQueryGranularity()) + .withMetrics(aggs) + .build(); + if (tuningConfig.isIngestOffheap()) { + return new OffheapIncrementalIndex( + indexSchema, + new OffheapBufferPool(bufferSize) + ); + } else { + return new IncrementalIndex( + indexSchema, + new OffheapBufferPool(bufferSize) + ); + } } private void createNewZipEntry(ZipOutputStream out, String name) throws IOException diff --git a/processing/src/main/java/io/druid/query/groupby/GroupByQueryHelper.java b/processing/src/main/java/io/druid/query/groupby/GroupByQueryHelper.java index 3cb263b5b4d..a3ca6dbee72 100644 --- a/processing/src/main/java/io/druid/query/groupby/GroupByQueryHelper.java +++ b/processing/src/main/java/io/druid/query/groupby/GroupByQueryHelper.java @@ -31,6 +31,7 @@ import io.druid.granularity.QueryGranularity; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.dimension.DimensionSpec; import io.druid.segment.incremental.IncrementalIndex; +import io.druid.segment.incremental.OffheapIncrementalIndex; import java.nio.ByteBuffer; import java.util.List; @@ -73,17 +74,28 @@ public class GroupByQueryHelper } } ); - - IncrementalIndex index = new IncrementalIndex( + final IncrementalIndex index; + if(query.getContextValue("useOffheap", false)){ + index = new OffheapIncrementalIndex( + // use granularity truncated min timestamp + // since incoming truncated timestamps may precede timeStart + granTimeStart, + gran, + aggs.toArray(new AggregatorFactory[aggs.size()]), + bufferPool, + false + ); + } else { + index = new IncrementalIndex( // use granularity truncated min timestamp // since incoming truncated timestamps may precede timeStart granTimeStart, gran, aggs.toArray(new AggregatorFactory[aggs.size()]), bufferPool, - false, - query.getContextValue("useOffheap", false) + false ); + } Accumulator accumulator = new Accumulator() { diff --git a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java index 96584317c62..320f6f8648c 100644 --- a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java +++ b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java @@ -56,29 +56,18 @@ import io.druid.segment.serde.ComplexMetricSerde; import io.druid.segment.serde.ComplexMetrics; import org.joda.time.DateTime; import org.joda.time.Interval; -import org.mapdb.BTreeKeySerializer; -import org.mapdb.CC; -import org.mapdb.DB; -import org.mapdb.DBMaker; -import org.mapdb.Serializer; import javax.annotation.Nullable; import java.io.Closeable; -import java.io.DataInput; -import java.io.DataOutput; import java.io.IOException; -import java.io.Serializable; -import java.lang.ref.WeakReference; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; -import java.util.Comparator; import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; -import java.util.UUID; -import java.util.WeakHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentNavigableMap; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.CopyOnWriteArrayList; @@ -99,14 +88,11 @@ public class IncrementalIndex implements Iterable, Closeable private final int[] aggPositionOffsets; private final int totalAggSize; private final LinkedHashMap dimensionOrder; - private final CopyOnWriteArrayList dimensions; + protected final CopyOnWriteArrayList dimensions; private final DimensionHolder dimValues; private final Map columnCapabilities; private final ConcurrentNavigableMap facts; private final ResourceHolder bufferHolder; - private final DB db; - private final DB factsDb; - private final boolean useOffheap; private volatile AtomicInteger numEntries = new AtomicInteger(); // This is modified on add() in a critical section. private ThreadLocal in = new ThreadLocal<>(); @@ -123,8 +109,7 @@ public class IncrementalIndex implements Iterable, Closeable public IncrementalIndex( IncrementalIndexSchema incrementalIndexSchema, StupidPool bufferPool, - final boolean deserializeComplexMetrics, - final boolean useOffheap + final boolean deserializeComplexMetrics ) { this.minTimestamp = incrementalIndexSchema.getMinTimestamp(); @@ -327,25 +312,11 @@ public class IncrementalIndex implements Iterable, Closeable } this.bufferHolder = bufferPool.take(); this.dimValues = new DimensionHolder(); - this.useOffheap = useOffheap; - if (this.useOffheap) { - final DBMaker dbMaker = DBMaker.newMemoryDirectDB() - .transactionDisable() - .asyncWriteEnable() - .cacheSoftRefEnable(); - db = dbMaker.make(); - factsDb = dbMaker.make(); - final TimeAndDimsSerializer timeAndDimsSerializer = new TimeAndDimsSerializer(this); - this.facts = factsDb.createTreeMap("__facts" + UUID.randomUUID()) - .keySerializer(timeAndDimsSerializer) - .comparator(timeAndDimsSerializer.getComparator()) - .valueSerializer(Serializer.INTEGER) - .make(); - } else { - db = null; - factsDb = null; - this.facts = new ConcurrentSkipListMap<>(); - } + this.facts = createFactsTable(); + } + + protected ConcurrentNavigableMap createFactsTable() { + return new ConcurrentSkipListMap<>(); } public IncrementalIndex( @@ -361,18 +332,16 @@ public class IncrementalIndex implements Iterable, Closeable .withMetrics(metrics) .build(), bufferPool, - true, - false + true ); } public IncrementalIndex( IncrementalIndexSchema incrementalIndexSchema, - StupidPool bufferPool, - boolean useOffheap + StupidPool bufferPool ) { - this(incrementalIndexSchema, bufferPool, true, useOffheap); + this(incrementalIndexSchema, bufferPool, true); } public IncrementalIndex( @@ -380,8 +349,7 @@ public class IncrementalIndex implements Iterable, Closeable QueryGranularity gran, final AggregatorFactory[] metrics, StupidPool bufferPool, - boolean deserializeComplexMetrics, - boolean useOffheap + boolean deserializeComplexMetrics ) { this( @@ -390,8 +358,7 @@ public class IncrementalIndex implements Iterable, Closeable .withMetrics(metrics) .build(), bufferPool, - deserializeComplexMetrics, - useOffheap + deserializeComplexMetrics ); } @@ -685,10 +652,6 @@ public class IncrementalIndex implements Iterable, Closeable { try { bufferHolder.close(); - if (db != null) { - factsDb.close(); - db.close(); - } } catch (IOException e) { throw Throwables.propagate(e); @@ -713,7 +676,7 @@ public class IncrementalIndex implements Iterable, Closeable { DimDim holder = dimensions.get(dimension); if (holder == null) { - holder = new DimDim(dimension); + holder = createDimDim(dimension); dimensions.put(dimension, holder); } else { throw new ISE("dimension[%s] already existed even though add() was called!?", dimension); @@ -727,6 +690,10 @@ public class IncrementalIndex implements Iterable, Closeable } } + protected DimDim createDimDim(String dimension){ + return new DimDimImpl(); + } + static class TimeAndDims implements Comparable { private final long timestamp; @@ -812,42 +779,51 @@ public class IncrementalIndex implements Iterable, Closeable } } - class DimDim + static interface DimDim { - private final Interner interner; + public String get(String value); + + public int getId(String value); + + public String getValue(int id); + + public boolean contains(String value); + + public int size(); + + public int add(String value); + + public int getSortedId(String value); + + public String getSortedValue(int index); + + public void sort(); + + public boolean compareCannonicalValues(String s1, String s2); + } + + private static class DimDimImpl implements DimDim{ private final Map falseIds; private final Map falseIdsReverse; private volatile String[] sortedVals = null; - // size on MapDB is slow so maintain a count here - private volatile int size = 0; + final ConcurrentMap poorMansInterning = Maps.newConcurrentMap(); - public DimDim(String dimName) + + public DimDimImpl() { - if (useOffheap) { - falseIds = db.createHashMap(dimName) - .keySerializer(Serializer.STRING) - .valueSerializer(Serializer.INTEGER) - .make(); - falseIdsReverse = db.createHashMap(dimName + "_inverse") - .keySerializer(Serializer.INTEGER) - .valueSerializer(Serializer.STRING) - .make(); - interner = new WeakInterner(); - } else { BiMap biMap = Maps.synchronizedBiMap(HashBiMap.create()); falseIds = biMap; falseIdsReverse = biMap.inverse(); - interner = new StrongInterner(); - } } /** * Returns the interned String value to allow fast comparisons using `==` instead of `.equals()` * @see io.druid.segment.incremental.IncrementalIndexStorageAdapter.EntryHolderValueMatcherFactory#makeValueMatcher(String, String) */ - public String get(String value) + public String get(String str) { - return interner.getCanonicalValue(value); + String prev = poorMansInterning.putIfAbsent(str, str); + return prev != null ? prev : str; } public int getId(String value) @@ -867,17 +843,13 @@ public class IncrementalIndex implements Iterable, Closeable public int size() { - return size; + return falseIds.size(); } public synchronized int add(String value) { - int id = size++; + int id = falseIds.size(); falseIds.put(value, id); - if (useOffheap) { - // onheap implementation uses a Bimap. - falseIdsReverse.put(id, value); - } return id; } @@ -915,138 +887,7 @@ public class IncrementalIndex implements Iterable, Closeable public boolean compareCannonicalValues(String s1, String s2) { - return interner.compareCanonicalValues(s1, s2); - } - } - - private static interface Interner - { - public String getCanonicalValue(String str); - - public boolean compareCanonicalValues(String s1, String s2); - } - - private static class WeakInterner implements Interner - { - private static final WeakHashMap> cache = - new WeakHashMap(); - - @Override - public String getCanonicalValue(String str) - { - final WeakReference cached = cache.get(str); - if (cached != null) { - final String value = cached.get(); - if (value != null) { - return value; - } - } - cache.put(str, new WeakReference(str)); - return str; - } - - @Override - public boolean compareCanonicalValues(String s1, String s2) - { - return s1.equals(s2); - } - } - - private static class StrongInterner implements Interner - { - final Map poorMansInterning = Maps.newConcurrentMap(); - - @Override - public String getCanonicalValue(String str) - { - String value = poorMansInterning.get(str); - if (value != null) { - return value; - } - poorMansInterning.put(str, str); - return str; - } - - @Override - public boolean compareCanonicalValues(String s1, String s2) - { - /** - * using == here instead of .equals() to speed up lookups - */ - return s1 == s2; - } - } - - public static class TimeAndDimsSerializer extends BTreeKeySerializer implements Serializable - { - private final TimeAndDimsComparator comparator; - private final transient IncrementalIndex incrementalIndex; - - TimeAndDimsSerializer(IncrementalIndex incrementalIndex) - { - this.comparator = new TimeAndDimsComparator(); - this.incrementalIndex = incrementalIndex; - } - - @Override - public void serialize(DataOutput out, int start, int end, Object[] keys) throws IOException - { - for (int i = start; i < end; i++) { - TimeAndDims timeAndDim = (TimeAndDims) keys[i]; - out.writeLong(timeAndDim.timestamp); - out.writeInt(timeAndDim.dims.length); - int index = 0; - for (String[] dims : timeAndDim.dims) { - if (dims == null) { - out.write(-1); - } else { - DimDim dimDim = incrementalIndex.getDimension(incrementalIndex.dimensions.get(index)); - out.writeInt(dims.length); - for (String value : dims) { - out.writeInt(dimDim.getId(value)); - } - } - index++; - } - } - } - - @Override - public Object[] deserialize(DataInput in, int start, int end, int size) throws IOException - { - Object[] ret = new Object[size]; - for (int i = start; i < end; i++) { - final long timeStamp = in.readLong(); - final String[][] dims = new String[in.readInt()][]; - for (int k = 0; k < dims.length; k++) { - int len = in.readInt(); - if (len != -1) { - DimDim dimDim = incrementalIndex.getDimension(incrementalIndex.dimensions.get(k)); - String[] col = new String[len]; - for (int l = 0; l < col.length; l++) { - col[l] = dimDim.get(dimDim.getValue(in.readInt())); - } - dims[k] = col; - } - } - ret[i] = new TimeAndDims(timeStamp, dims); - } - return ret; - } - - @Override - public Comparator getComparator() - { - return comparator; - } - } - - public static class TimeAndDimsComparator implements Comparator, Serializable - { - @Override - public int compare(Object o1, Object o2) - { - return ((TimeAndDims) o1).compareTo((TimeAndDims) o2); + return s1 ==s2; } } } diff --git a/processing/src/main/java/io/druid/segment/incremental/OffheapIncrementalIndex.java b/processing/src/main/java/io/druid/segment/incremental/OffheapIncrementalIndex.java new file mode 100644 index 00000000000..55c388c653a --- /dev/null +++ b/processing/src/main/java/io/druid/segment/incremental/OffheapIncrementalIndex.java @@ -0,0 +1,255 @@ +package io.druid.segment.incremental; + + +import com.metamx.common.ISE; +import io.druid.collections.StupidPool; +import io.druid.granularity.QueryGranularity; +import io.druid.query.aggregation.AggregatorFactory; +import org.mapdb.BTreeKeySerializer; +import org.mapdb.DB; +import org.mapdb.DBMaker; +import org.mapdb.Serializer; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.io.Serializable; +import java.lang.ref.WeakReference; +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.Comparator; +import java.util.Map; +import java.util.UUID; +import java.util.WeakHashMap; +import java.util.concurrent.ConcurrentNavigableMap; + +public class OffheapIncrementalIndex extends IncrementalIndex +{ + private volatile DB db; + private volatile DB factsDb; + + public OffheapIncrementalIndex( + IncrementalIndexSchema incrementalIndexSchema, + StupidPool bufferPool + ) + { + super(incrementalIndexSchema, bufferPool); + } + + public OffheapIncrementalIndex( + long minTimestamp, + QueryGranularity gran, + final AggregatorFactory[] metrics, + StupidPool bufferPool, + boolean deserializeComplexMetrics + + ) + { + super(minTimestamp, gran, metrics, bufferPool, deserializeComplexMetrics); + } + + @Override + protected synchronized ConcurrentNavigableMap createFactsTable() + { + if (factsDb == null) { + final DBMaker dbMaker = DBMaker.newMemoryDirectDB() + .transactionDisable() + .asyncWriteEnable() + .cacheSoftRefEnable(); + factsDb = dbMaker.make(); + db = dbMaker.make(); + } + final TimeAndDimsSerializer timeAndDimsSerializer = new TimeAndDimsSerializer(this); + return factsDb.createTreeMap("__facts" + UUID.randomUUID()) + .keySerializer(timeAndDimsSerializer) + .comparator(timeAndDimsSerializer.getComparator()) + .valueSerializer(Serializer.INTEGER) + .make(); + } + + @Override + protected DimDim createDimDim(String dimension) + { + return new OffheapDimDim(dimension); + } + + public static class TimeAndDimsSerializer extends BTreeKeySerializer implements Serializable + { + private final TimeAndDimsComparator comparator; + private final transient IncrementalIndex incrementalIndex; + + TimeAndDimsSerializer(IncrementalIndex incrementalIndex) + { + this.comparator = new TimeAndDimsComparator(); + this.incrementalIndex = incrementalIndex; + } + + @Override + public void serialize(DataOutput out, int start, int end, Object[] keys) throws IOException + { + for (int i = start; i < end; i++) { + TimeAndDims timeAndDim = (TimeAndDims) keys[i]; + out.writeLong(timeAndDim.getTimestamp()); + out.writeInt(timeAndDim.getDims().length); + int index = 0; + for (String[] dims : timeAndDim.getDims()) { + if (dims == null) { + out.write(-1); + } else { + DimDim dimDim = incrementalIndex.getDimension(incrementalIndex.dimensions.get(index)); + out.writeInt(dims.length); + for (String value : dims) { + out.writeInt(dimDim.getId(value)); + } + } + index++; + } + } + } + + @Override + public Object[] deserialize(DataInput in, int start, int end, int size) throws IOException + { + Object[] ret = new Object[size]; + for (int i = start; i < end; i++) { + final long timeStamp = in.readLong(); + final String[][] dims = new String[in.readInt()][]; + for (int k = 0; k < dims.length; k++) { + int len = in.readInt(); + if (len != -1) { + DimDim dimDim = incrementalIndex.getDimension(incrementalIndex.dimensions.get(k)); + String[] col = new String[len]; + for (int l = 0; l < col.length; l++) { + col[l] = dimDim.get(dimDim.getValue(in.readInt())); + } + dims[k] = col; + } + } + ret[i] = new TimeAndDims(timeStamp, dims); + } + return ret; + } + + @Override + public Comparator getComparator() + { + return comparator; + } + } + + public static class TimeAndDimsComparator implements Comparator, Serializable + { + @Override + public int compare(Object o1, Object o2) + { + return ((TimeAndDims) o1).compareTo((TimeAndDims) o2); + } + } + + private class OffheapDimDim implements DimDim + { + private final Map falseIds; + private final Map falseIdsReverse; + private final WeakHashMap> cache = + new WeakHashMap(); + private volatile String[] sortedVals = null; + // size on MapDB is slow so maintain a count here + private volatile int size = 0; + + public OffheapDimDim(String dimension) + { + falseIds = db.createHashMap(dimension) + .keySerializer(Serializer.STRING) + .valueSerializer(Serializer.INTEGER) + .make(); + falseIdsReverse = db.createHashMap(dimension + "_inverse") + .keySerializer(Serializer.INTEGER) + .valueSerializer(Serializer.STRING) + .make(); + } + + /** + * Returns the interned String value to allow fast comparisons using `==` instead of `.equals()` + * + * @see io.druid.segment.incremental.IncrementalIndexStorageAdapter.EntryHolderValueMatcherFactory#makeValueMatcher(String, String) + */ + public String get(String str) + { + final WeakReference cached = cache.get(str); + if (cached != null) { + final String value = cached.get(); + if (value != null) { + return value; + } + } + cache.put(str, new WeakReference(str)); + return str; + } + + public int getId(String value) + { + return falseIds.get(value); + } + + public String getValue(int id) + { + return falseIdsReverse.get(id); + } + + public boolean contains(String value) + { + return falseIds.containsKey(value); + } + + public int size() + { + return size; + } + + public synchronized int add(String value) + { + int id = size++; + falseIds.put(value, id); + falseIdsReverse.put(id, value); + return id; + } + + public int getSortedId(String value) + { + assertSorted(); + return Arrays.binarySearch(sortedVals, value); + } + + public String getSortedValue(int index) + { + assertSorted(); + return sortedVals[index]; + } + + public void sort() + { + if (sortedVals == null) { + sortedVals = new String[falseIds.size()]; + + int index = 0; + for (String value : falseIds.keySet()) { + sortedVals[index++] = value; + } + Arrays.sort(sortedVals); + } + } + + private void assertSorted() + { + if (sortedVals == null) { + throw new ISE("Call sort() before calling the getSorted* methods."); + } + } + + public boolean compareCannonicalValues(String s1, String s2) + { + return s1.equals(s2); + } + } + +} diff --git a/processing/src/test/java/io/druid/segment/TestIndex.java b/processing/src/test/java/io/druid/segment/TestIndex.java index 3f431ba84e6..7180a6edb52 100644 --- a/processing/src/test/java/io/druid/segment/TestIndex.java +++ b/processing/src/test/java/io/druid/segment/TestIndex.java @@ -38,6 +38,7 @@ import io.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory; import io.druid.query.aggregation.hyperloglog.HyperUniquesSerde; import io.druid.segment.incremental.IncrementalIndex; import io.druid.segment.incremental.IncrementalIndexSchema; +import io.druid.segment.incremental.OffheapIncrementalIndex; import io.druid.segment.serde.ComplexMetrics; import org.joda.time.DateTime; import org.joda.time.Interval; @@ -53,12 +54,6 @@ import java.util.concurrent.atomic.AtomicLong; */ public class TestIndex { - private static final Logger log = new Logger(TestIndex.class); - - private static IncrementalIndex realtimeIndex = null; - private static QueryableIndex mmappedIndex = null; - private static QueryableIndex mergedRealtime = null; - public static final String[] COLUMNS = new String[]{ "ts", "provider", @@ -70,6 +65,7 @@ public class TestIndex }; public static final String[] DIMENSIONS = new String[]{"provider", "quALIty", "plAcEmEnT", "pLacementish"}; public static final String[] METRICS = new String[]{"iNdEx"}; + private static final Logger log = new Logger(TestIndex.class); private static final Interval DATA_INTERVAL = new Interval("2011-01-12T00:00:00.000Z/2011-05-01T00:00:00.000Z"); private static final AggregatorFactory[] METRIC_AGGS = new AggregatorFactory[]{ new DoubleSumAggregatorFactory(METRICS[0], METRICS[0]), @@ -82,6 +78,10 @@ public class TestIndex } } + private static IncrementalIndex realtimeIndex = null; + private static QueryableIndex mmappedIndex = null; + private static QueryableIndex mergedRealtime = null; + public static IncrementalIndex getIncrementalTestIndex(boolean useOffheap) { synchronized (log) { @@ -155,16 +155,23 @@ public class TestIndex { final URL resource = TestIndex.class.getClassLoader().getResource(resourceFilename); log.info("Realtime loading index file[%s]", resource); - - final IncrementalIndex retVal = new IncrementalIndex( - new IncrementalIndexSchema.Builder().withMinTimestamp(new DateTime("2011-01-12T00:00:00.000Z").getMillis()) - .withQueryGranularity(QueryGranularity.NONE) - .withMetrics(METRIC_AGGS) - .build(), - TestQueryRunners.pool, - true, - useOffheap - ); + final IncrementalIndexSchema schema = new IncrementalIndexSchema.Builder() + .withMinTimestamp(new DateTime("2011-01-12T00:00:00.000Z").getMillis()) + .withQueryGranularity(QueryGranularity.NONE) + .withMetrics(METRIC_AGGS) + .build(); + final IncrementalIndex retVal; + if (useOffheap) { + retVal = new OffheapIncrementalIndex( + schema, + TestQueryRunners.pool + ); + } else { + retVal = new IncrementalIndex( + schema, + TestQueryRunners.pool + ); + } final AtomicLong startTime = new AtomicLong(); int lineCount; diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/Sink.java b/server/src/main/java/io/druid/segment/realtime/plumber/Sink.java index f739f0e8bb5..d3e9a0b26a2 100644 --- a/server/src/main/java/io/druid/segment/realtime/plumber/Sink.java +++ b/server/src/main/java/io/druid/segment/realtime/plumber/Sink.java @@ -31,6 +31,7 @@ import io.druid.offheap.OffheapBufferPool; import io.druid.query.aggregation.AggregatorFactory; import io.druid.segment.incremental.IncrementalIndex; import io.druid.segment.incremental.IncrementalIndexSchema; +import io.druid.segment.incremental.OffheapIncrementalIndex; import io.druid.segment.indexing.DataSchema; import io.druid.segment.indexing.RealtimeTuningConfig; import io.druid.segment.realtime.FireHydrant; @@ -180,16 +181,24 @@ public class Sink implements Iterable aggsSize += agg.getMaxIntermediateSize(); } int bufferSize = aggsSize * config.getMaxRowsInMemory(); - IncrementalIndex newIndex = new IncrementalIndex( - new IncrementalIndexSchema.Builder() - .withMinTimestamp(minTimestamp) - .withQueryGranularity(schema.getGranularitySpec().getQueryGranularity()) - .withDimensionsSpec(schema.getParser()) - .withMetrics(schema.getAggregators()) - .build(), - new OffheapBufferPool(bufferSize), - config.isIngestOffheap() - ); + final IncrementalIndexSchema indexSchema = new IncrementalIndexSchema.Builder() + .withMinTimestamp(minTimestamp) + .withQueryGranularity(schema.getGranularitySpec().getQueryGranularity()) + .withDimensionsSpec(schema.getParser()) + .withMetrics(schema.getAggregators()) + .build(); + final IncrementalIndex newIndex; + if (config.isIngestOffheap()) { + newIndex = new OffheapIncrementalIndex( + indexSchema, + new OffheapBufferPool(bufferSize) + ); + } else { + newIndex = new IncrementalIndex( + indexSchema, + new OffheapBufferPool(bufferSize) + ); + } final FireHydrant old; synchronized (hydrantLock) { From 09c41afb5bc8fe2b99c2c0706bb9d35220fe47b6 Mon Sep 17 00:00:00 2001 From: nishantmonu51 Date: Wed, 1 Oct 2014 17:52:57 +0530 Subject: [PATCH 12/12] license header --- .../incremental/OffheapIncrementalIndex.java | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/processing/src/main/java/io/druid/segment/incremental/OffheapIncrementalIndex.java b/processing/src/main/java/io/druid/segment/incremental/OffheapIncrementalIndex.java index 55c388c653a..7e048c37e70 100644 --- a/processing/src/main/java/io/druid/segment/incremental/OffheapIncrementalIndex.java +++ b/processing/src/main/java/io/druid/segment/incremental/OffheapIncrementalIndex.java @@ -1,3 +1,22 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + package io.druid.segment.incremental;