indexing working with mapdb

This commit is contained in:
nishantmonu51 2014-09-09 15:59:33 +05:30
parent 197c80a694
commit 178f002f05
4 changed files with 71 additions and 64 deletions

View File

@ -429,6 +429,11 @@
<version>2.3.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.mapdb</groupId>
<artifactId>mapdb</artifactId>
<version>1.0.6</version>
</dependency>
<!-- Test Scope -->
<dependency>

View File

@ -86,6 +86,10 @@
<groupId>net.jpountz.lz4</groupId>
<artifactId>lz4</artifactId>
</dependency>
<dependency>
<groupId>org.mapdb</groupId>
<artifactId>mapdb</artifactId>
</dependency>
<!-- Tests -->

View File

@ -21,8 +21,6 @@ package io.druid.segment.incremental;
import com.google.common.base.Function;
import com.google.common.base.Throwables;
import com.google.common.collect.BiMap;
import com.google.common.collect.HashBiMap;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterators;
@ -56,10 +54,13 @@ import io.druid.segment.serde.ComplexMetricSerde;
import io.druid.segment.serde.ComplexMetrics;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.mapdb.DB;
import org.mapdb.DBMaker;
import javax.annotation.Nullable;
import java.io.Closeable;
import java.io.IOException;
import java.io.Serializable;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
@ -69,7 +70,6 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicInteger;
@ -91,8 +91,9 @@ public class IncrementalIndex implements Iterable<Row>, Closeable
private final CopyOnWriteArrayList<String> dimensions;
private final DimensionHolder dimValues;
private final Map<String, ColumnCapabilitiesImpl> columnCapabilities;
private final ConcurrentSkipListMap<TimeAndDims, Integer> facts;
private final ConcurrentNavigableMap<TimeAndDims, Integer> facts;
private final ResourceHolder<ByteBuffer> bufferHolder;
private final DB db;
private volatile AtomicInteger numEntries = new AtomicInteger();
// This is modified on add() in a critical section.
private ThreadLocal<InputRow> in = new ThreadLocal<>();
@ -312,7 +313,8 @@ public class IncrementalIndex implements Iterable<Row>, Closeable
}
this.bufferHolder = bufferPool.take();
this.dimValues = new DimensionHolder();
this.facts = new ConcurrentSkipListMap<>();
db = DBMaker.newMemoryDirectDB().transactionDisable().cacheWeakRefEnable().make();
this.facts = db.createTreeMap("facts").make();
}
public IncrementalIndex(
@ -495,13 +497,10 @@ public class IncrementalIndex implements Iterable<Row>, Closeable
int count = 0;
for (String dimValue : dimValues) {
String canonicalDimValue = dimLookup.get(dimValue);
if (canonicalDimValue == null) {
canonicalDimValue = dimValue;
if (!dimLookup.contains(dimValue)) {
dimLookup.add(dimValue);
}
retVal[count] = canonicalDimValue;
retVal[count] = dimValue;
count++;
}
Arrays.sort(retVal);
@ -589,7 +588,7 @@ public class IncrementalIndex implements Iterable<Row>, Closeable
return columnCapabilities.get(column);
}
ConcurrentSkipListMap<TimeAndDims, Integer> getFacts()
ConcurrentNavigableMap<TimeAndDims, Integer> getFacts()
{
return facts;
}
@ -655,45 +654,14 @@ public class IncrementalIndex implements Iterable<Row>, Closeable
{
try {
bufferHolder.close();
db.close();
}
catch (IOException e) {
throw Throwables.propagate(e);
}
}
static class DimensionHolder
{
private final Map<String, DimDim> dimensions;
DimensionHolder()
{
dimensions = Maps.newConcurrentMap();
}
void reset()
{
dimensions.clear();
}
DimDim add(String dimension)
{
DimDim holder = dimensions.get(dimension);
if (holder == null) {
holder = new DimDim();
dimensions.put(dimension, holder);
} else {
throw new ISE("dimension[%s] already existed even though add() was called!?", dimension);
}
return holder;
}
DimDim get(String dimension)
{
return dimensions.get(dimension);
}
}
static class TimeAndDims implements Comparable<TimeAndDims>
static class TimeAndDims implements Comparable<TimeAndDims>, Serializable
{
private final long timestamp;
private final String[][] dims;
@ -778,23 +746,48 @@ public class IncrementalIndex implements Iterable<Row>, Closeable
}
}
static class DimDim
class DimensionHolder
{
private final Map<String, DimDim> dimensions;
DimensionHolder()
{
dimensions = Maps.newConcurrentMap();
}
void reset()
{
dimensions.clear();
}
DimDim add(String dimension)
{
DimDim holder = dimensions.get(dimension);
if (holder == null) {
holder = new DimDim(dimension);
dimensions.put(dimension, holder);
} else {
throw new ISE("dimension[%s] already existed even though add() was called!?", dimension);
}
return holder;
}
DimDim get(String dimension)
{
return dimensions.get(dimension);
}
}
class DimDim
{
private final Map<String, String> poorMansInterning = Maps.newConcurrentMap();
private final Map<String, Integer> falseIds;
private final Map<Integer, String> falseIdsReverse;
private volatile String[] sortedVals = null;
public DimDim()
public DimDim(String dimName)
{
BiMap<String, Integer> biMap = Maps.synchronizedBiMap(HashBiMap.<String, Integer>create());
falseIds = biMap;
falseIdsReverse = biMap.inverse();
}
public String get(String value)
{
return value == null ? null : poorMansInterning.get(value);
falseIds = db.createHashMap(dimName).make();
falseIdsReverse = db.createHashMap(dimName + "_inverse").make();
}
public int getId(String value)
@ -807,20 +800,26 @@ public class IncrementalIndex implements Iterable<Row>, Closeable
return falseIdsReverse.get(id);
}
public boolean contains(String value)
{
return falseIds.containsKey(value);
}
public int size()
{
return poorMansInterning.size();
return falseIds.size();
}
public Set<String> keySet()
{
return poorMansInterning.keySet();
return falseIds.keySet();
}
public synchronized void add(String value)
{
poorMansInterning.put(value, value);
falseIds.put(value, falseIds.size());
final int id = falseIds.size();
falseIds.put(value, id);
falseIdsReverse.put(id, value);
}
public int getSortedId(String value)

View File

@ -488,14 +488,14 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter
}
@Override
public ValueMatcher makeValueMatcher(String dimension, String value)
public ValueMatcher makeValueMatcher(final String dimension,final String value)
{
Integer dimIndexObject = index.getDimensionIndex(dimension.toLowerCase());
if (dimIndexObject == null) {
return new BooleanValueMatcher(false);
}
String idObject = index.getDimension(dimension.toLowerCase()).get(value);
if (idObject == null) {
if (!index.getDimension(dimension.toLowerCase()).contains(value)) {
if (value == null || "".equals(value)) {
final int dimIndex = dimIndexObject;
@ -516,7 +516,6 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter
}
final int dimIndex = dimIndexObject;
final String id = idObject;
return new ValueMatcher()
{
@ -529,7 +528,7 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter
}
for (String dimVal : dims[dimIndex]) {
if (id == dimVal) {
if (value.equals(dimVal)) {
return true;
}
}