add custom serializer

add custom serializers
This commit is contained in:
nishantmonu51 2014-09-19 23:09:31 +05:30
parent 19bc77134a
commit 8c16377e9e
4 changed files with 247 additions and 146 deletions

View File

@ -54,11 +54,16 @@ import io.druid.segment.serde.ComplexMetricSerde;
import io.druid.segment.serde.ComplexMetrics;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.mapdb.BTreeKeySerializer;
import org.mapdb.CC;
import org.mapdb.DB;
import org.mapdb.DBMaker;
import org.mapdb.Serializer;
import javax.annotation.Nullable;
import java.io.Closeable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.io.Serializable;
import java.nio.ByteBuffer;
@ -315,11 +320,18 @@ public class IncrementalIndex implements Iterable<Row>, Closeable
}
this.bufferHolder = bufferPool.take();
this.dimValues = new DimensionHolder();
db = DBMaker.newMemoryDirectDB().transactionDisable().asyncWriteEnable().cacheSoftRefEnable().make();
this.facts = db.createTreeMap("__facts" + UUID.randomUUID()).comparator(
new TimeAndDimsComparator(this)
db = DBMaker.newMemoryDirectDB().transactionDisable().asyncWriteEnable().cacheLRUEnable().cacheSize(
Integer.getInteger(
"cacheSize",
CC.DEFAULT_CACHE_SIZE
)
).make();
final TimeAndDimsSerializer timeAndDimsSerializer = new TimeAndDimsSerializer(this);
this.facts = db.createTreeMap("__facts" + UUID.randomUUID())
.keySerializer(timeAndDimsSerializer)
.comparator(timeAndDimsSerializer.getComparator())
.valueSerializer(Serializer.INTEGER)
.make();
}
public IncrementalIndex(
@ -398,10 +410,10 @@ public class IncrementalIndex implements Iterable<Row>, Closeable
final List<String> rowDimensions = row.getDimensions();
int[][] dims;
List<int[]> overflow = null;
String[][] dims;
List<String[]> overflow = null;
synchronized (dimensionOrder) {
dims = new int[dimensionOrder.size()][];
dims = new String[dimensionOrder.size()][];
for (String dimension : rowDimensions) {
dimension = dimension.toLowerCase();
List<String> dimensionValues = row.getDimension(dimension);
@ -425,16 +437,17 @@ public class IncrementalIndex implements Iterable<Row>, Closeable
if (overflow == null) {
overflow = Lists.newArrayList();
}
overflow.add(getDimIndexes(dimValues.add(dimension), dimensionValues));
overflow.add(getDimVals(dimValues.add(dimension), dimensionValues));
} else {
dims[index] = getDimIndexes(dimValues.get(dimension), dimensionValues);
dims[index] = getDimVals(dimValues.get(dimension), dimensionValues);
}
}
}
if (overflow != null) {
// Merge overflow and non-overflow
int[][] newDims = new int[dims.length + overflow.size()][];
String[][] newDims = new String[dims.length + overflow.size()][];
System.arraycopy(dims, 0, newDims, 0, dims.length);
for (int i = 0; i < overflow.size(); ++i) {
newDims[dims.length + i] = overflow.get(i);
@ -498,20 +511,18 @@ public class IncrementalIndex implements Iterable<Row>, Closeable
return facts.lastKey().getTimestamp();
}
private int[] getDimIndexes(final DimDim dimLookup, final List<String> dimValues)
private String[] getDimVals(final DimDim dimLookup, final List<String> dimValues)
{
final int[] retVal = new int[dimValues.size()];
final String[] retVal = new String[dimValues.size()];
int count = 0;
final String[] vals = dimValues.toArray(new String[0]);
Arrays.sort(vals);
for (String dimValue : vals) {
int id = dimLookup.getId(dimValue);
if (id == -1) {
id = dimLookup.add(dimValue);
}
retVal[count] = id;
for (String dimValue : dimValues) {
String canonicalValue = InternUtil.intern(dimValue);
dimLookup.addIfAbsent(canonicalValue);
retVal[count] = canonicalValue;
count++;
}
Arrays.sort(retVal);
return retVal;
}
@ -629,12 +640,11 @@ public class IncrementalIndex implements Iterable<Row>, Closeable
final TimeAndDims timeAndDims = input.getKey();
final int rowOffset = input.getValue();
int[][] theDims = timeAndDims.getDims();
String[][] theDims = timeAndDims.getDims();
Map<String, Object> theVals = Maps.newLinkedHashMap();
for (int i = 0; i < theDims.length; ++i) {
String[] dim = getDimValues(dimValues.get(dimensions.get(i)), theDims[i]);
String[] dim = theDims[i];
if (dim != null && dim.length != 0) {
theVals.put(dimensions.get(i), dim.length == 1 ? dim[0] : Arrays.asList(dim));
}
@ -658,23 +668,6 @@ public class IncrementalIndex implements Iterable<Row>, Closeable
};
}
public String[] getDimValues(DimDim dims, int[] dimIndexes)
{
if (dimIndexes == null) {
return null;
}
String[] vals = new String[dimIndexes.length];
for (int i = 0; i < dimIndexes.length; i++) {
vals[i] = dims.getValue(dimIndexes[i]);
}
return vals;
}
public String getDimValue(DimDim dims, int dimIndex)
{
return dims.getValue(dimIndex);
}
@Override
public void close()
{
@ -687,52 +680,6 @@ public class IncrementalIndex implements Iterable<Row>, Closeable
}
}
static class TimeAndDims implements Serializable
{
private final long timestamp;
private final int[][] dims;
TimeAndDims(
long timestamp,
int[][] dims
)
{
this.timestamp = timestamp;
this.dims = dims;
}
long getTimestamp()
{
return timestamp;
}
int[][] getDims()
{
return dims;
}
@Override
public String toString()
{
return "TimeAndDims{" +
"timestamp=" + new DateTime(timestamp) +
", dims=" + Lists.transform(
Arrays.asList(dims), new Function<int[], Object>()
{
@Override
public Object apply(@Nullable int[] input)
{
if (input == null || input.length == 0) {
return Arrays.asList("null");
}
return Arrays.asList(input);
}
}
) +
'}';
}
}
public static class TimeAndDimsComparator implements Comparator, Serializable
{
// mapdb asserts the comparator to be serializable, ugly workaround to satisfy the assert.
@ -746,45 +693,65 @@ public class IncrementalIndex implements Iterable<Row>, Closeable
@Override
public int compare(Object o1, Object o2)
{
TimeAndDims lhs = (TimeAndDims) o1;
TimeAndDims rhs = (TimeAndDims) o2;
return ((TimeAndDims) o1).compareTo((io.druid.segment.incremental.IncrementalIndex.TimeAndDims) o2);
}
}
int retVal = Longs.compare(lhs.timestamp, rhs.timestamp);
static class TimeAndDims implements Comparable<TimeAndDims>, Serializable
{
private final long timestamp;
private final String[][] dims;
TimeAndDims(
long timestamp,
String[][] dims
)
{
this.timestamp = timestamp;
this.dims = dims;
}
long getTimestamp()
{
return timestamp;
}
String[][] getDims()
{
return dims;
}
@Override
public int compareTo(TimeAndDims rhs)
{
int retVal = Longs.compare(timestamp, rhs.timestamp);
if (retVal == 0) {
retVal = Ints.compare(lhs.dims.length, rhs.dims.length);
retVal = Ints.compare(dims.length, rhs.dims.length);
}
int index = 0;
while (retVal == 0 && index < lhs.dims.length) {
int[] lhsIndexes = lhs.dims[index];
int[] rhsIndexes = rhs.dims[index];
while (retVal == 0 && index < dims.length) {
String[] lhsVals = dims[index];
String[] rhsVals = rhs.dims[index];
if (lhsIndexes == null) {
if (rhsIndexes == null) {
if (lhsVals == null) {
if (rhsVals == null) {
++index;
continue;
}
return -1;
}
if (rhsIndexes == null) {
if (rhsVals == null) {
return 1;
}
retVal = Ints.compare(lhsIndexes.length, rhsIndexes.length);
retVal = Ints.compare(lhsVals.length, rhsVals.length);
int valsIndex = 0;
DimDim dimDim = incrementalIndex.getDimension(incrementalIndex.dimensions.get(index));
while (retVal == 0 && valsIndex < lhsIndexes.length) {
retVal = incrementalIndex.getDimValue(dimDim, lhsIndexes[valsIndex]).compareTo(
incrementalIndex.getDimValue(
dimDim,
rhsIndexes[valsIndex]
)
);
while (retVal == 0 && valsIndex < lhsVals.length) {
retVal = lhsVals[valsIndex].compareTo(rhsVals[valsIndex]);
++valsIndex;
}
++index;
@ -792,6 +759,92 @@ public class IncrementalIndex implements Iterable<Row>, Closeable
return retVal;
}
@Override
public String toString()
{
return "TimeAndDims{" +
"timestamp=" + new DateTime(timestamp) +
", dims=" + Lists.transform(
Arrays.asList(dims), new Function<String[], Object>()
{
@Override
public Object apply(@Nullable String[] input)
{
if (input == null || input.length == 0) {
return Arrays.asList("null");
}
return Arrays.asList(input);
}
}
) +
'}';
}
}
public static class TimeAndDimsSerializer extends BTreeKeySerializer<TimeAndDims> implements Serializable
{
private final TimeAndDimsComparator comparator;
private final transient IncrementalIndex incrementalIndex;
TimeAndDimsSerializer(IncrementalIndex incrementalIndex)
{
this.comparator = new TimeAndDimsComparator(incrementalIndex);
this.incrementalIndex = incrementalIndex;
}
@Override
public void serialize(DataOutput out, int start, int end, Object[] keys) throws IOException
{
for (int i = start; i < end; i++) {
TimeAndDims timeAndDim = (TimeAndDims) keys[i];
out.writeLong(timeAndDim.timestamp);
out.writeInt(timeAndDim.dims.length);
int index = 0;
for (String[] dims : timeAndDim.dims) {
if (dims == null) {
out.write(-1);
} else {
DimDim dimDim = incrementalIndex.getDimension(incrementalIndex.dimensions.get(index));
out.writeInt(dims.length);
for (String value : dims) {
out.writeInt(dimDim.getId(value));
}
}
index++;
}
}
}
@Override
public Object[] deserialize(DataInput in, int start, int end, int size) throws IOException
{
Object[] ret = new Object[size];
for (int i = start; i < end; i++) {
final long timeStamp = in.readLong();
final String[][] dims = new String[in.readInt()][];
for (int k = 0; k < dims.length; k++) {
int len = in.readInt();
if (len != -1) {
DimDim dimDim = incrementalIndex.getDimension(incrementalIndex.dimensions.get(k));
String[] col = new String[len];
for (int l = 0; l < col.length; l++) {
col[l] = InternUtil.intern(dimDim.getValue(in.readInt()));
}
dims[k] = col;
}
}
ret[i] = new TimeAndDims(timeStamp, dims);
}
return ret;
}
@Override
public Comparator<TimeAndDims> getComparator()
{
return comparator;
}
}
class DimensionHolder
@ -828,7 +881,7 @@ public class IncrementalIndex implements Iterable<Row>, Closeable
class DimDim
{
private final Map<String, Integer> falseIds;
private final ConcurrentMap<String, Integer> falseIds;
private final Map<Integer, String> falseIdsReverse;
private volatile String[] sortedVals = null;
// size on MapDB is slow so maintain a count here
@ -836,8 +889,11 @@ public class IncrementalIndex implements Iterable<Row>, Closeable
public DimDim(String dimName)
{
falseIds = db.createHashMap(dimName).make();
falseIdsReverse = db.createHashMap(dimName + "_inverse").make();
falseIds = db.createHashMap(dimName).keySerializer(Serializer.STRING).valueSerializer(Serializer.INTEGER).make();
falseIdsReverse = db.createHashMap(dimName + "_inverse")
.keySerializer(Serializer.INTEGER)
.valueSerializer(Serializer.STRING)
.make();
}
public int getId(String value)
@ -861,11 +917,14 @@ public class IncrementalIndex implements Iterable<Row>, Closeable
return size;
}
public synchronized int add(String value)
public synchronized int addIfAbsent(String value)
{
final int id = size++;
falseIds.put(value, id);
falseIdsReverse.put(id, value);
Integer id = falseIds.putIfAbsent(value, size);
if (id == null) {
falseIdsReverse.put(size, value);
id = size;
size++;
}
return id;
}

View File

@ -63,7 +63,7 @@ public class IncrementalIndexAdapter implements IndexableAdapter
int rowNum = 0;
for (IncrementalIndex.TimeAndDims timeAndDims : index.getFacts().keySet()) {
final int[][] dims = timeAndDims.getDims();
final String[][] dims = timeAndDims.getDims();
for (String dimension : index.getDimensions()) {
int dimIndex = index.getDimensionIndex(dimension);
@ -76,8 +76,8 @@ public class IncrementalIndexAdapter implements IndexableAdapter
if (dimIndex >= dims.length || dims[dimIndex] == null) {
continue;
}
final String[] dimValues = index.getDimValues(index.getDimension(dimension), dims[dimIndex]);
for (String dimValue : dimValues) {
for (String dimValue : dims[dimIndex]) {
ConciseSet conciseSet = conciseSets.get(dimValue);
if (conciseSet == null) {
@ -178,27 +178,27 @@ public class IncrementalIndexAdapter implements IndexableAdapter
)
{
final IncrementalIndex.TimeAndDims timeAndDims = input.getKey();
final int[][] dimValueIndexes = timeAndDims.getDims();
final String[][] dimValues = timeAndDims.getDims();
final int rowOffset = input.getValue();
int[][] dims = new int[dimValueIndexes.length][];
int[][] dims = new int[dimValues.length][];
for (String dimension : index.getDimensions()) {
int dimIndex = index.getDimensionIndex(dimension);
final IncrementalIndex.DimDim dimDim = index.getDimension(dimension);
dimDim.sort();
if (dimIndex >= dimValueIndexes.length || dimValueIndexes[dimIndex] == null) {
if (dimIndex >= dimValues.length || dimValues[dimIndex] == null) {
continue;
}
dims[dimIndex] = new int[dimValueIndexes[dimIndex].length];
dims[dimIndex] = new int[dimValues[dimIndex].length];
if (dimIndex >= dims.length || dims[dimIndex] == null) {
continue;
}
String[] dimValues = index.getDimValues(dimDim, dimValueIndexes[dimIndex]);
for (int i = 0; i < dimValues.length; ++i) {
dims[dimIndex][i] = dimDim.getSortedId(dimValues[i]);
for (int i = 0; i < dimValues[dimIndex].length; ++i) {
dims[dimIndex][i] = dimDim.getSortedId(dimValues[dimIndex][i]);
}
}

View File

@ -174,10 +174,10 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter
{
cursorMap = index.getSubMap(
new IncrementalIndex.TimeAndDims(
timeStart, new int[][]{}
timeStart, new String[][]{}
),
new IncrementalIndex.TimeAndDims(
Math.min(actualInterval.getEndMillis(), gran.next(input)), new int[][]{}
Math.min(actualInterval.getEndMillis(), gran.next(input)), new String[][]{}
)
);
time = gran.toDateTime(input);
@ -293,11 +293,12 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter
{
final ArrayList<Integer> vals = Lists.newArrayList();
if (dimIndex < currEntry.getKey().getDims().length) {
final int[] dimVals = currEntry.getKey().getDims()[dimIndex];
final String[] dimVals = currEntry.getKey().getDims()[dimIndex];
if (dimVals != null) {
for (int dimVal : dimVals) {
if (dimVal < maxId) {
vals.add(dimVal);
for (String dimVal : dimVals) {
int id = dimValLookup.getId(dimVal);
if (id < maxId) {
vals.add(id);
}
}
}
@ -408,8 +409,8 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter
}
final Integer dimensionIndexInt = index.getDimensionIndex(columnName);
if (dimensionIndexInt != null) {
final IncrementalIndex.DimDim dimDim = index.getDimension(columnName);
final int dimensionIndex = dimensionIndexInt;
return new ObjectColumnSelector<Object>()
{
@ -422,8 +423,7 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter
@Override
public Object get()
{
final String[] dimVals = index.getDimValues(dimDim, currEntry.getKey().getDims()[dimensionIndex]);
final String[] dimVals = currEntry.getKey().getDims()[dimensionIndex];
if (dimVals.length == 1) {
return dimVals[0];
} else if (dimVals.length == 0) {
@ -488,16 +488,15 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter
}
@Override
public ValueMatcher makeValueMatcher(final String dimension,String valueParam)
public ValueMatcher makeValueMatcher(final String dimension, final String value)
{
final String value = valueParam == null ? "" : valueParam;
Integer dimIndexObject = index.getDimensionIndex(dimension.toLowerCase());
if (dimIndexObject == null) {
return new BooleanValueMatcher(false);
}
if (!index.getDimension(dimension.toLowerCase()).contains(value)) {
if ("".equals(value)) {
if (value == null || "".equals(value)) {
final int dimIndex = dimIndexObject;
return new ValueMatcher()
@ -505,7 +504,7 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter
@Override
public boolean matches()
{
int[][] dims = holder.getKey().getDims();
String[][] dims = holder.getKey().getDims();
if (dimIndex >= dims.length || dims[dimIndex] == null) {
return true;
}
@ -517,19 +516,18 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter
}
final int dimIndex = dimIndexObject;
final IncrementalIndex.DimDim dimDim = index.getDimension(dimension.toLowerCase());
return new ValueMatcher()
{
@Override
public boolean matches()
{
int[][] dims = holder.getKey().getDims();
String[][] dims = holder.getKey().getDims();
if (dimIndex >= dims.length || dims[dimIndex] == null) {
return false;
}
for (String dimVal : index.getDimValues(dimDim, dims[dimIndex])) {
for (String dimVal : dims[dimIndex]) {
if (value.equals(dimVal)) {
return true;
}
@ -547,18 +545,18 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter
return new BooleanValueMatcher(false);
}
final int dimIndex = dimIndexObject;
final IncrementalIndex.DimDim dimDim = index.getDimension(dimension.toLowerCase());
return new ValueMatcher()
{
@Override
public boolean matches()
{
int[][] dims = holder.getKey().getDims();
String[][] dims = holder.getKey().getDims();
if (dimIndex >= dims.length || dims[dimIndex] == null) {
return false;
}
for (String dimVal : index.getDimValues(dimDim, dims[dimIndex])) {
for (String dimVal : dims[dimIndex]) {
if (predicate.apply(dimVal)) {
return true;
}
@ -582,13 +580,12 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter
@Override
public boolean matches()
{
int[][] dims = holder.getKey().getDims();
String[][] dims = holder.getKey().getDims();
if (dimIndex >= dims.length || dims[dimIndex] == null) {
return false;
}
final IncrementalIndex.DimDim dimDim = index.getDimension(dimension.toLowerCase());
for (String dimVal : index.getDimValues(dimDim, dims[dimIndex])) {
for (String dimVal : dims[dimIndex]) {
List<String> stringCoords = Lists.newArrayList(SPLITTER.split(dimVal));
float[] coords = new float[stringCoords.size()];
for (int j = 0; j < coords.length; j++) {

View File

@ -0,0 +1,45 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.segment.incremental;
import java.lang.ref.WeakReference;
import java.util.WeakHashMap;
public class InternUtil
{
private static final WeakHashMap<String, WeakReference<String>> cache =
new WeakHashMap<String, WeakReference<String>>(100000);
public static String intern(final String str)
{
final WeakReference<String> cached = cache.get(str);
if (cached != null) {
final String value = cached.get();
if (value != null) {
return value;
}
}
cache.put(str, new WeakReference<String>(str));
return str;
}
}