fix things for real-time ingestion

This commit is contained in:
fjy 2014-11-10 13:48:21 -08:00
parent c5cc826998
commit fc34858e95
4 changed files with 38 additions and 21 deletions

View File

@ -185,7 +185,7 @@ public class IndexMaker
log.info("Starting persist for interval[%s], rows[%,d]", dataInterval, index.size());
return merge(
Arrays.<IndexableAdapter>asList(new IncrementalIndexAdapter(dataInterval, index)),
Arrays.<IndexableAdapter>asList(new IncrementalIndexAdapter(dataInterval, index, bitmapSerdeFactory.getBitmapFactory())),
index.getMetricAggs(),
outDir,
progress

View File

@ -182,7 +182,13 @@ public class IndexMerger
log.info("Starting persist for interval[%s], rows[%,d]", dataInterval, index.size());
return merge(
Arrays.<IndexableAdapter>asList(new IncrementalIndexAdapter(dataInterval, index)),
Arrays.<IndexableAdapter>asList(
new IncrementalIndexAdapter(
dataInterval,
index,
bitmapSerdeFactory.getBitmapFactory()
)
),
index.getMetricAggs(),
outDir,
progress

View File

@ -21,6 +21,8 @@ package io.druid.segment.incremental;
import com.google.common.base.Function;
import com.google.common.collect.Maps;
import com.metamx.collections.bitmap.BitmapFactory;
import com.metamx.collections.bitmap.MutableBitmap;
import com.metamx.common.guava.FunctionalIterable;
import com.metamx.common.logger.Logger;
import io.druid.segment.IndexableAdapter;
@ -31,9 +33,8 @@ import io.druid.segment.data.Indexed;
import io.druid.segment.data.IndexedInts;
import io.druid.segment.data.IndexedIterable;
import io.druid.segment.data.ListIndexed;
import it.uniroma3.mat.extendedset.intset.ConciseSet;
import it.uniroma3.mat.extendedset.intset.IntSet;
import org.joda.time.Interval;
import org.roaringbitmap.IntIterator;
import javax.annotation.Nullable;
import java.util.Iterator;
@ -46,10 +47,10 @@ public class IncrementalIndexAdapter implements IndexableAdapter
private static final Logger log = new Logger(IncrementalIndexAdapter.class);
private final Interval dataInterval;
private final IncrementalIndex index;
private final Map<String, Map<String, ConciseSet>> invertedIndexes;
private final Map<String, Map<String, MutableBitmap>> invertedIndexes;
public IncrementalIndexAdapter(
Interval dataInterval, IncrementalIndex index
Interval dataInterval, IncrementalIndex index, BitmapFactory bitmapFactory
)
{
this.dataInterval = dataInterval;
@ -58,7 +59,7 @@ public class IncrementalIndexAdapter implements IndexableAdapter
this.invertedIndexes = Maps.newHashMap();
for (String dimension : index.getDimensions()) {
invertedIndexes.put(dimension, Maps.<String, ConciseSet>newHashMap());
invertedIndexes.put(dimension, Maps.<String, MutableBitmap>newHashMap());
}
int rowNum = 0;
@ -67,9 +68,9 @@ public class IncrementalIndexAdapter implements IndexableAdapter
for (String dimension : index.getDimensions()) {
int dimIndex = index.getDimensionIndex(dimension);
Map<String, ConciseSet> conciseSets = invertedIndexes.get(dimension);
Map<String, MutableBitmap> bitmapIndexes = invertedIndexes.get(dimension);
if (conciseSets == null || dims == null) {
if (bitmapIndexes == null || dims == null) {
log.error("conciseSets and dims are null!");
continue;
}
@ -78,15 +79,15 @@ public class IncrementalIndexAdapter implements IndexableAdapter
}
for (String dimValue : dims[dimIndex]) {
ConciseSet conciseSet = conciseSets.get(dimValue);
MutableBitmap mutableBitmap = bitmapIndexes.get(dimValue);
if (conciseSet == null) {
conciseSet = new ConciseSet();
conciseSets.put(dimValue, conciseSet);
if (mutableBitmap == null) {
mutableBitmap = bitmapFactory.makeEmptyMutableBitmap();
bitmapIndexes.put(dimValue, mutableBitmap);
}
try {
conciseSet.add(rowNum);
mutableBitmap.add(rowNum);
}
catch (Exception e) {
log.info(e.toString());
@ -222,15 +223,15 @@ public class IncrementalIndexAdapter implements IndexableAdapter
@Override
public IndexedInts getBitmapIndex(String dimension, String value)
{
Map<String, ConciseSet> dimInverted = invertedIndexes.get(dimension);
Map<String, MutableBitmap> dimInverted = invertedIndexes.get(dimension);
if (dimInverted == null) {
return new EmptyIndexedInts();
}
final ConciseSet conciseSet = dimInverted.get(value);
final MutableBitmap bitmapIndex = dimInverted.get(value);
if (conciseSet == null) {
if (bitmapIndex == null) {
return new EmptyIndexedInts();
}
@ -239,7 +240,7 @@ public class IncrementalIndexAdapter implements IndexableAdapter
@Override
public int size()
{
return conciseSet.size();
return bitmapIndex.size();
}
@Override
@ -253,7 +254,7 @@ public class IncrementalIndexAdapter implements IndexableAdapter
{
return new Iterator<Integer>()
{
IntSet.IntIterator baseIter = conciseSet.iterator();
IntIterator baseIter = bitmapIndex.iterator();
@Override
public boolean hasNext()

View File

@ -21,6 +21,7 @@ package io.druid.segment;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.metamx.collections.bitmap.ConciseBitmapFactory;
import io.druid.granularity.QueryGranularity;
import io.druid.query.TestQueryRunners;
import io.druid.query.aggregation.AggregatorFactory;
@ -47,8 +48,17 @@ public class EmptyIndexTest
}
tmpDir.deleteOnExit();
IncrementalIndex emptyIndex = new IncrementalIndex(0, QueryGranularity.NONE, new AggregatorFactory[0], TestQueryRunners.pool);
IncrementalIndexAdapter emptyIndexAdapter = new IncrementalIndexAdapter(new Interval("2012-08-01/P3D"), emptyIndex);
IncrementalIndex emptyIndex = new IncrementalIndex(
0,
QueryGranularity.NONE,
new AggregatorFactory[0],
TestQueryRunners.pool
);
IncrementalIndexAdapter emptyIndexAdapter = new IncrementalIndexAdapter(
new Interval("2012-08-01/P3D"),
emptyIndex,
new ConciseBitmapFactory()
);
IndexMaker.merge(
Lists.<IndexableAdapter>newArrayList(emptyIndexAdapter),
new AggregatorFactory[0],