mirror of https://github.com/apache/druid.git
review comments + mapdb deadlock fix
This commit is contained in:
parent
adb4a65e0a
commit
3f66d3c167
|
@ -57,7 +57,6 @@ import io.druid.segment.serde.ComplexMetrics;
|
|||
import org.joda.time.DateTime;
|
||||
import org.joda.time.Interval;
|
||||
import org.mapdb.BTreeKeySerializer;
|
||||
import org.mapdb.CC;
|
||||
import org.mapdb.DB;
|
||||
import org.mapdb.DBMaker;
|
||||
import org.mapdb.Serializer;
|
||||
|
@ -105,6 +104,7 @@ public class IncrementalIndex implements Iterable<Row>, Closeable
|
|||
private final ConcurrentNavigableMap<TimeAndDims, Integer> facts;
|
||||
private final ResourceHolder<ByteBuffer> bufferHolder;
|
||||
private final DB db;
|
||||
private final DB factsDb;
|
||||
private final boolean useOffheap;
|
||||
private volatile AtomicInteger numEntries = new AtomicInteger();
|
||||
// This is modified on add() in a critical section.
|
||||
|
@ -326,22 +326,20 @@ public class IncrementalIndex implements Iterable<Row>, Closeable
|
|||
}
|
||||
this.bufferHolder = bufferPool.take();
|
||||
this.dimValues = new DimensionHolder();
|
||||
this.useOffheap = useOffheap;
|
||||
if (useOffheap) {
|
||||
db = DBMaker.newMemoryDirectDB().transactionDisable().asyncWriteEnable().cacheLRUEnable().cacheSize(
|
||||
Integer.getInteger(
|
||||
"cacheSize",
|
||||
CC.DEFAULT_CACHE_SIZE
|
||||
)
|
||||
).make();
|
||||
this.useOffheap = true;
|
||||
if (this.useOffheap) {
|
||||
final DBMaker dbMaker = DBMaker.newMemoryDirectDB().transactionDisable().asyncWriteEnable().cacheSoftRefEnable();
|
||||
db = dbMaker.make();
|
||||
factsDb = dbMaker.make();
|
||||
final TimeAndDimsSerializer timeAndDimsSerializer = new TimeAndDimsSerializer(this);
|
||||
this.facts = db.createTreeMap("__facts" + UUID.randomUUID())
|
||||
this.facts = factsDb.createTreeMap("__facts" + UUID.randomUUID())
|
||||
.keySerializer(timeAndDimsSerializer)
|
||||
.comparator(timeAndDimsSerializer.getComparator())
|
||||
.valueSerializer(Serializer.INTEGER)
|
||||
.make();
|
||||
} else {
|
||||
db = null;
|
||||
factsDb = null;
|
||||
this.facts = new ConcurrentSkipListMap<>();
|
||||
}
|
||||
}
|
||||
|
@ -525,7 +523,7 @@ public class IncrementalIndex implements Iterable<Row>, Closeable
|
|||
|
||||
int count = 0;
|
||||
for (String dimValue : dimValues) {
|
||||
String canonicalDimValue = dimLookup.getCanonicalValue(dimValue);
|
||||
String canonicalDimValue = dimLookup.get(dimValue);
|
||||
if (!dimLookup.contains(canonicalDimValue)) {
|
||||
dimLookup.add(dimValue);
|
||||
}
|
||||
|
@ -684,6 +682,7 @@ public class IncrementalIndex implements Iterable<Row>, Closeable
|
|||
try {
|
||||
bufferHolder.close();
|
||||
if (db != null) {
|
||||
factsDb.close();
|
||||
db.close();
|
||||
}
|
||||
}
|
||||
|
@ -838,7 +837,7 @@ public class IncrementalIndex implements Iterable<Row>, Closeable
|
|||
}
|
||||
}
|
||||
|
||||
public String getCanonicalValue(String value)
|
||||
public String get(String value)
|
||||
{
|
||||
return interner.getCanonicalValue(value);
|
||||
}
|
||||
|
@ -867,7 +866,7 @@ public class IncrementalIndex implements Iterable<Row>, Closeable
|
|||
{
|
||||
int id = size++;
|
||||
falseIds.put(value, id);
|
||||
if (!useOffheap) {
|
||||
if (useOffheap) {
|
||||
// onheap implementation uses a Bimap.
|
||||
falseIdsReverse.put(id, value);
|
||||
}
|
||||
|
@ -1014,7 +1013,7 @@ public class IncrementalIndex implements Iterable<Row>, Closeable
|
|||
DimDim dimDim = incrementalIndex.getDimension(incrementalIndex.dimensions.get(k));
|
||||
String[] col = new String[len];
|
||||
for (int l = 0; l < col.length; l++) {
|
||||
col[l] = dimDim.getCanonicalValue(dimDim.getValue(in.readInt()));
|
||||
col[l] = dimDim.get(dimDim.getValue(in.readInt()));
|
||||
}
|
||||
dims[k] = col;
|
||||
}
|
||||
|
|
|
@ -519,7 +519,7 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter
|
|||
}
|
||||
|
||||
final int dimIndex = dimIndexObject;
|
||||
final String id = dimDim.getCanonicalValue(value);
|
||||
final String id = dimDim.get(value);
|
||||
|
||||
return new ValueMatcher()
|
||||
{
|
||||
|
|
Loading…
Reference in New Issue