skip rows which are added after cursor created (#4049)

* fix can't get dim value via IncrementalIndexStorageAdapter cursor

* address the comment

* add ut

* address ut comments

* fix bug and fix ut
This commit is contained in:
kaijianding 2017-04-26 02:26:46 +08:00 committed by Gian Merlino
parent 4d3745d6c9
commit 336089563d
7 changed files with 234 additions and 22 deletions

View File

@ -29,6 +29,7 @@ import com.google.common.primitives.Ints;
import io.druid.collections.bitmap.BitmapFactory; import io.druid.collections.bitmap.BitmapFactory;
import io.druid.collections.bitmap.MutableBitmap; import io.druid.collections.bitmap.MutableBitmap;
import io.druid.data.input.impl.DimensionSchema.MultiValueHandling; import io.druid.data.input.impl.DimensionSchema.MultiValueHandling;
import io.druid.java.util.common.ISE;
import io.druid.query.dimension.DimensionSpec; import io.druid.query.dimension.DimensionSpec;
import io.druid.query.extraction.ExtractionFn; import io.druid.query.extraction.ExtractionFn;
import io.druid.query.filter.ValueMatcher; import io.druid.query.filter.ValueMatcher;
@ -398,6 +399,8 @@ public class StringDimensionIndexer implements DimensionIndexer<Integer, int[],
class IndexerDimensionSelector implements DimensionSelector, IdLookup class IndexerDimensionSelector implements DimensionSelector, IdLookup
{ {
private int[] nullIdIntArray;
@Override @Override
public IndexedInts getRow() public IndexedInts getRow()
{ {
@ -412,27 +415,27 @@ public class StringDimensionIndexer implements DimensionIndexer<Integer, int[],
int[] row = null; int[] row = null;
int rowSize = 0; int rowSize = 0;
// usually due to currEntry's rowIndex is smaller than the row's rowIndex in which this dim first appears
if (indices == null || indices.length == 0) { if (indices == null || indices.length == 0) {
final int nullId = getEncodedValue(null, false); final int nullId = getEncodedValue(null, false);
if (nullId > -1) { if (nullId > -1) {
if (nullId < maxId) { if (nullIdIntArray == null) {
row = new int[] {nullId}; nullIdIntArray = new int[] {nullId};
}
row = nullIdIntArray;
rowSize = 1; rowSize = 1;
} else { } else {
// doesn't contain nullId, then empty array is used
// Choose to use ArrayBasedIndexedInts later, instead of EmptyIndexedInts, for monomorphism // Choose to use ArrayBasedIndexedInts later, instead of EmptyIndexedInts, for monomorphism
row = IntArrays.EMPTY_ARRAY; row = IntArrays.EMPTY_ARRAY;
rowSize = 0; rowSize = 0;
} }
} }
}
if (row == null && indices != null && indices.length > 0) { if (row == null && indices != null && indices.length > 0) {
row = new int[indices.length]; row = indices;
for (int id : indices) { rowSize = indices.length;
if (id < maxId) {
row[rowSize++] = id;
}
}
} }
return ArrayBasedIndexedInts.of(row, rowSize); return ArrayBasedIndexedInts.of(row, rowSize);
@ -515,6 +518,9 @@ public class StringDimensionIndexer implements DimensionIndexer<Integer, int[],
@Override @Override
public String lookupName(int id) public String lookupName(int id)
{ {
if (id >= maxId) {
throw new ISE("id[%d] >= maxId[%d]", id, maxId);
}
final String strValue = getActualValue(id, false); final String strValue = getActualValue(id, false);
return extractionFn == null ? strValue : extractionFn.apply(strValue); return extractionFn == null ? strValue : extractionFn.apply(strValue);
} }

View File

@ -316,6 +316,8 @@ public abstract class IncrementalIndex<AggregatorType> implements Iterable<Row>,
Supplier<InputRow> rowSupplier Supplier<InputRow> rowSupplier
) throws IndexSizeExceededException; ) throws IndexSizeExceededException;
public abstract int getLastRowIndex();
protected abstract AggregatorType[] getAggsForRow(int rowOffset); protected abstract AggregatorType[] getAggsForRow(int rowOffset);
protected abstract Object getAggVal(AggregatorType agg, int rowOffset, int aggPosition); protected abstract Object getAggVal(AggregatorType agg, int rowOffset, int aggPosition);

View File

@ -240,6 +240,7 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter
return new Cursor() return new Cursor()
{ {
private final ValueMatcher filterMatcher = makeFilterMatcher(filter, this); private final ValueMatcher filterMatcher = makeFilterMatcher(filter, this);
private final int maxRowIndex;
private Iterator<Map.Entry<IncrementalIndex.TimeAndDims, Integer>> baseIter; private Iterator<Map.Entry<IncrementalIndex.TimeAndDims, Integer>> baseIter;
private Iterable<Map.Entry<IncrementalIndex.TimeAndDims, Integer>> cursorIterable; private Iterable<Map.Entry<IncrementalIndex.TimeAndDims, Integer>> cursorIterable;
private boolean emptyRange; private boolean emptyRange;
@ -248,6 +249,7 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter
boolean done; boolean done;
{ {
maxRowIndex = index.getLastRowIndex();
cursorIterable = index.getFacts().timeRangeIterable( cursorIterable = index.getFacts().timeRangeIterable(
descending, descending,
timeStart, timeStart,
@ -276,17 +278,20 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter
while (baseIter.hasNext()) { while (baseIter.hasNext()) {
BaseQuery.checkInterrupted(); BaseQuery.checkInterrupted();
currEntry.set(baseIter.next()); Map.Entry<IncrementalIndex.TimeAndDims, Integer> entry = baseIter.next();
if (beyondMaxRowIndex(entry.getValue())) {
continue;
}
currEntry.set(entry);
if (filterMatcher.matches()) { if (filterMatcher.matches()) {
return; return;
} }
} }
if (!filterMatcher.matches()) {
done = true; done = true;
} }
}
@Override @Override
public void advanceUninterruptibly() public void advanceUninterruptibly()
@ -301,17 +306,20 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter
return; return;
} }
currEntry.set(baseIter.next()); Map.Entry<IncrementalIndex.TimeAndDims, Integer> entry = baseIter.next();
if (beyondMaxRowIndex(entry.getValue())) {
continue;
}
currEntry.set(entry);
if (filterMatcher.matches()) { if (filterMatcher.matches()) {
return; return;
} }
} }
if (!filterMatcher.matches()) {
done = true; done = true;
} }
}
@Override @Override
public void advanceTo(int offset) public void advanceTo(int offset)
@ -350,7 +358,12 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter
boolean foundMatched = false; boolean foundMatched = false;
while (baseIter.hasNext()) { while (baseIter.hasNext()) {
currEntry.set(baseIter.next()); Map.Entry<IncrementalIndex.TimeAndDims, Integer> entry = baseIter.next();
if (beyondMaxRowIndex(entry.getValue())) {
numAdvanced++;
continue;
}
currEntry.set(entry);
if (filterMatcher.matches()) { if (filterMatcher.matches()) {
foundMatched = true; foundMatched = true;
break; break;
@ -362,6 +375,13 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter
done = !foundMatched && (emptyRange || !baseIter.hasNext()); done = !foundMatched && (emptyRange || !baseIter.hasNext());
} }
private boolean beyondMaxRowIndex(int rowIndex) {
// ignore rows whose rowIndex is beyond the maxRowIndex
// rows are order by timestamp, not rowIndex,
// so we still need to go through all rows to skip rows added after cursor created
return rowIndex > maxRowIndex;
}
@Override @Override
public DimensionSelector makeDimensionSelector( public DimensionSelector makeDimensionSelector(
DimensionSpec dimensionSpec DimensionSpec dimensionSpec

View File

@ -276,6 +276,12 @@ public class OffheapIncrementalIndex extends IncrementalIndex<BufferAggregator>
return numEntries.get(); return numEntries.get();
} }
@Override
public int getLastRowIndex()
{
return indexIncrement.get() - 1;
}
@Override @Override
public boolean canAppendRow() public boolean canAppendRow()
{ {

View File

@ -215,6 +215,12 @@ public class OnheapIncrementalIndex extends IncrementalIndex<Aggregator>
return numEntries.get(); return numEntries.get();
} }
@Override
public int getLastRowIndex()
{
return indexIncrement.get() - 1;
}
private void factorizeAggs( private void factorizeAggs(
AggregatorFactory[] metrics, AggregatorFactory[] metrics,
Aggregator[] aggs, Aggregator[] aggs,

View File

@ -65,6 +65,7 @@ import java.nio.ByteBuffer;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.concurrent.atomic.AtomicInteger;
/** /**
*/ */
@ -408,6 +409,7 @@ public class IncrementalIndexStorageAdapterTest
Sequence<Cursor> cursors = sa.makeCursors( Sequence<Cursor> cursors = sa.makeCursors(
null, new Interval(timestamp - 60_000, timestamp + 60_000), VirtualColumns.EMPTY, Granularities.ALL, false null, new Interval(timestamp - 60_000, timestamp + 60_000), VirtualColumns.EMPTY, Granularities.ALL, false
); );
final AtomicInteger assertCursorsNotEmpty = new AtomicInteger(0);
Sequences.toList( Sequences.toList(
Sequences.map( Sequences.map(
@ -442,6 +444,7 @@ public class IncrementalIndexStorageAdapterTest
throw new RuntimeException(ex); throw new RuntimeException(ex);
} }
int rowNumInCursor = 0;
// and then, cursoring continues in the other thread // and then, cursoring continues in the other thread
while (!cursor.isDone()) { while (!cursor.isDone()) {
IndexedInts row = dimSelector.getRow(); IndexedInts row = dimSelector.getRow();
@ -449,7 +452,10 @@ public class IncrementalIndexStorageAdapterTest
Assert.assertTrue(i < cardinality); Assert.assertTrue(i < cardinality);
} }
cursor.advance(); cursor.advance();
rowNumInCursor++;
} }
Assert.assertEquals(2, rowNumInCursor);
assertCursorsNotEmpty.incrementAndGet();
return null; return null;
} }
@ -457,5 +463,165 @@ public class IncrementalIndexStorageAdapterTest
), ),
new ArrayList<>() new ArrayList<>()
); );
Assert.assertEquals(1, assertCursorsNotEmpty.get());
}
@Test
public void testCursoringAndSnapshot() throws Exception
{
final IncrementalIndex index = indexCreator.createIndex();
final long timestamp = System.currentTimeMillis();
for (int i = 0; i < 2; i++) {
index.add(
new MapBasedInputRow(
timestamp,
Lists.newArrayList("billy"),
ImmutableMap.<String, Object>of("billy", "v0" + i)
)
);
}
final StorageAdapter sa = new IncrementalIndexStorageAdapter(index);
Sequence<Cursor> cursors = sa.makeCursors(
null, new Interval(timestamp - 60_000, timestamp + 60_000), VirtualColumns.EMPTY, Granularities.ALL, false
);
final AtomicInteger assertCursorsNotEmpty = new AtomicInteger(0);
Sequences.toList(
Sequences.map(
cursors,
new Function<Cursor, Object>()
{
@Nullable
@Override
public Object apply(Cursor cursor)
{
DimensionSelector dimSelector1A = cursor.makeDimensionSelector(
new DefaultDimensionSpec(
"billy",
"billy"
)
);
int cardinalityA = dimSelector1A.getValueCardinality();
//index gets more rows at this point, while other thread is iterating over the cursor
try {
index.add(
new MapBasedInputRow(
timestamp,
Lists.newArrayList("billy"),
ImmutableMap.<String, Object>of("billy", "v1")
)
);
}
catch (Exception ex) {
throw new RuntimeException(ex);
}
DimensionSelector dimSelector1B = cursor.makeDimensionSelector(
new DefaultDimensionSpec(
"billy",
"billy"
)
);
//index gets more rows at this point, while other thread is iterating over the cursor
try {
index.add(
new MapBasedInputRow(
timestamp,
Lists.newArrayList("billy"),
ImmutableMap.<String, Object>of("billy", "v2")
)
);
index.add(
new MapBasedInputRow(
timestamp,
Lists.newArrayList("billy2"),
ImmutableMap.<String, Object>of("billy2", "v3")
)
);
}
catch (Exception ex) {
throw new RuntimeException(ex);
}
DimensionSelector dimSelector1C = cursor.makeDimensionSelector(
new DefaultDimensionSpec(
"billy",
"billy"
)
);
DimensionSelector dimSelector2D = cursor.makeDimensionSelector(
new DefaultDimensionSpec(
"billy2",
"billy2"
)
);
//index gets more rows at this point, while other thread is iterating over the cursor
try {
index.add(
new MapBasedInputRow(
timestamp,
Lists.newArrayList("billy"),
ImmutableMap.<String, Object>of("billy", "v3")
)
);
index.add(
new MapBasedInputRow(
timestamp,
Lists.newArrayList("billy3"),
ImmutableMap.<String, Object>of("billy3", "")
)
);
}
catch (Exception ex) {
throw new RuntimeException(ex);
}
DimensionSelector dimSelector3E = cursor.makeDimensionSelector(
new DefaultDimensionSpec(
"billy3",
"billy3"
)
);
int rowNumInCursor = 0;
// and then, cursoring continues in the other thread
while (!cursor.isDone()) {
IndexedInts rowA = dimSelector1A.getRow();
for (int i : rowA) {
Assert.assertTrue(i < cardinalityA);
}
IndexedInts rowB = dimSelector1B.getRow();
for (int i : rowB) {
Assert.assertTrue(i < cardinalityA);
}
IndexedInts rowC = dimSelector1C.getRow();
for (int i : rowC) {
Assert.assertTrue(i < cardinalityA);
}
IndexedInts rowD = dimSelector2D.getRow();
// no null id, so should get empty dims array
Assert.assertEquals(0, rowD.size());
IndexedInts rowE = dimSelector3E.getRow();
Assert.assertEquals(1, rowE.size());
// the null id
Assert.assertEquals(0, rowE.get(0));
cursor.advance();
rowNumInCursor++;
}
Assert.assertEquals(2, rowNumInCursor);
assertCursorsNotEmpty.incrementAndGet();
return null;
}
}
),
new ArrayList<>()
);
Assert.assertEquals(1, assertCursorsNotEmpty.get());
} }
} }

View File

@ -205,6 +205,12 @@ public class OnheapIncrementalIndexBenchmark extends AbstractBenchmark
return numEntries.get(); return numEntries.get();
} }
@Override
public int getLastRowIndex()
{
return indexIncrement.get() - 1;
}
} }
@Parameterized.Parameters @Parameterized.Parameters