From 336089563d6095db76dfe9139920cdba3f98c54c Mon Sep 17 00:00:00 2001 From: kaijianding Date: Wed, 26 Apr 2017 02:26:46 +0800 Subject: [PATCH] skip rows which are added after cursor created (#4049) * fix can't get dim value via IncrementalIndexStorageAdapter cursor * address the comment * add ut * address ut comments * fix bug and fix ut --- .../druid/segment/StringDimensionIndexer.java | 32 ++-- .../segment/incremental/IncrementalIndex.java | 2 + .../IncrementalIndexStorageAdapter.java | 38 +++- .../incremental/OffheapIncrementalIndex.java | 6 + .../incremental/OnheapIncrementalIndex.java | 6 + .../IncrementalIndexStorageAdapterTest.java | 166 ++++++++++++++++++ .../OnheapIncrementalIndexBenchmark.java | 6 + 7 files changed, 234 insertions(+), 22 deletions(-) diff --git a/processing/src/main/java/io/druid/segment/StringDimensionIndexer.java b/processing/src/main/java/io/druid/segment/StringDimensionIndexer.java index f7cd8b622fc..830547369ad 100644 --- a/processing/src/main/java/io/druid/segment/StringDimensionIndexer.java +++ b/processing/src/main/java/io/druid/segment/StringDimensionIndexer.java @@ -29,6 +29,7 @@ import com.google.common.primitives.Ints; import io.druid.collections.bitmap.BitmapFactory; import io.druid.collections.bitmap.MutableBitmap; import io.druid.data.input.impl.DimensionSchema.MultiValueHandling; +import io.druid.java.util.common.ISE; import io.druid.query.dimension.DimensionSpec; import io.druid.query.extraction.ExtractionFn; import io.druid.query.filter.ValueMatcher; @@ -398,6 +399,8 @@ public class StringDimensionIndexer implements DimensionIndexer -1) { - if (nullId < maxId) { - row = new int[] {nullId}; - rowSize = 1; - } else { - // Choose to use ArrayBasedIndexedInts later, instead of EmptyIndexedInts, for monomorphism - row = IntArrays.EMPTY_ARRAY; - rowSize = 0; + if (nullIdIntArray == null) { + nullIdIntArray = new int[] {nullId}; } + row = nullIdIntArray; + rowSize = 1; + } else { + // doesn't contain nullId, then empty array is used + // Choose to use ArrayBasedIndexedInts later, instead of EmptyIndexedInts, for monomorphism + row = IntArrays.EMPTY_ARRAY; + rowSize = 0; } } if (row == null && indices != null && indices.length > 0) { - row = new int[indices.length]; - for (int id : indices) { - if (id < maxId) { - row[rowSize++] = id; - } - } + row = indices; + rowSize = indices.length; } return ArrayBasedIndexedInts.of(row, rowSize); @@ -515,6 +518,9 @@ public class StringDimensionIndexer implements DimensionIndexer= maxId) { + throw new ISE("id[%d] >= maxId[%d]", id, maxId); + } final String strValue = getActualValue(id, false); return extractionFn == null ? strValue : extractionFn.apply(strValue); } diff --git a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java index e1b0735909c..1daa24ad61e 100644 --- a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java +++ b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java @@ -316,6 +316,8 @@ public abstract class IncrementalIndex implements Iterable, Supplier rowSupplier ) throws IndexSizeExceededException; + public abstract int getLastRowIndex(); + protected abstract AggregatorType[] getAggsForRow(int rowOffset); protected abstract Object getAggVal(AggregatorType agg, int rowOffset, int aggPosition); diff --git a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexStorageAdapter.java b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexStorageAdapter.java index ea524bbb711..2768d104759 100644 --- a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexStorageAdapter.java +++ b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexStorageAdapter.java @@ -240,6 +240,7 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter return new Cursor() { private final ValueMatcher filterMatcher = makeFilterMatcher(filter, this); + private final int maxRowIndex; private Iterator> baseIter; private Iterable> cursorIterable; private boolean emptyRange; @@ -248,6 +249,7 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter boolean done; { + maxRowIndex = index.getLastRowIndex(); cursorIterable = index.getFacts().timeRangeIterable( descending, timeStart, @@ -276,16 +278,19 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter while (baseIter.hasNext()) { BaseQuery.checkInterrupted(); - currEntry.set(baseIter.next()); + Map.Entry entry = baseIter.next(); + if (beyondMaxRowIndex(entry.getValue())) { + continue; + } + + currEntry.set(entry); if (filterMatcher.matches()) { return; } } - if (!filterMatcher.matches()) { - done = true; - } + done = true; } @Override @@ -301,16 +306,19 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter return; } - currEntry.set(baseIter.next()); + Map.Entry entry = baseIter.next(); + if (beyondMaxRowIndex(entry.getValue())) { + continue; + } + + currEntry.set(entry); if (filterMatcher.matches()) { return; } } - if (!filterMatcher.matches()) { - done = true; - } + done = true; } @Override @@ -350,7 +358,12 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter boolean foundMatched = false; while (baseIter.hasNext()) { - currEntry.set(baseIter.next()); + Map.Entry entry = baseIter.next(); + if (beyondMaxRowIndex(entry.getValue())) { + numAdvanced++; + continue; + } + currEntry.set(entry); if (filterMatcher.matches()) { foundMatched = true; break; @@ -362,6 +375,13 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter done = !foundMatched && (emptyRange || !baseIter.hasNext()); } + private boolean beyondMaxRowIndex(int rowIndex) { + // ignore rows whose rowIndex is beyond the maxRowIndex + // rows are order by timestamp, not rowIndex, + // so we still need to go through all rows to skip rows added after cursor created + return rowIndex > maxRowIndex; + } + @Override public DimensionSelector makeDimensionSelector( DimensionSpec dimensionSpec diff --git a/processing/src/main/java/io/druid/segment/incremental/OffheapIncrementalIndex.java b/processing/src/main/java/io/druid/segment/incremental/OffheapIncrementalIndex.java index 891d6704288..059cc41283d 100644 --- a/processing/src/main/java/io/druid/segment/incremental/OffheapIncrementalIndex.java +++ b/processing/src/main/java/io/druid/segment/incremental/OffheapIncrementalIndex.java @@ -276,6 +276,12 @@ public class OffheapIncrementalIndex extends IncrementalIndex return numEntries.get(); } + @Override + public int getLastRowIndex() + { + return indexIncrement.get() - 1; + } + @Override public boolean canAppendRow() { diff --git a/processing/src/main/java/io/druid/segment/incremental/OnheapIncrementalIndex.java b/processing/src/main/java/io/druid/segment/incremental/OnheapIncrementalIndex.java index 20235d93509..82d572c36d5 100644 --- a/processing/src/main/java/io/druid/segment/incremental/OnheapIncrementalIndex.java +++ b/processing/src/main/java/io/druid/segment/incremental/OnheapIncrementalIndex.java @@ -215,6 +215,12 @@ public class OnheapIncrementalIndex extends IncrementalIndex return numEntries.get(); } + @Override + public int getLastRowIndex() + { + return indexIncrement.get() - 1; + } + private void factorizeAggs( AggregatorFactory[] metrics, Aggregator[] aggs, diff --git a/processing/src/test/java/io/druid/segment/incremental/IncrementalIndexStorageAdapterTest.java b/processing/src/test/java/io/druid/segment/incremental/IncrementalIndexStorageAdapterTest.java index 4abff744fc7..7bec8797bd9 100644 --- a/processing/src/test/java/io/druid/segment/incremental/IncrementalIndexStorageAdapterTest.java +++ b/processing/src/test/java/io/druid/segment/incremental/IncrementalIndexStorageAdapterTest.java @@ -65,6 +65,7 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.concurrent.atomic.AtomicInteger; /** */ @@ -408,6 +409,7 @@ public class IncrementalIndexStorageAdapterTest Sequence cursors = sa.makeCursors( null, new Interval(timestamp - 60_000, timestamp + 60_000), VirtualColumns.EMPTY, Granularities.ALL, false ); + final AtomicInteger assertCursorsNotEmpty = new AtomicInteger(0); Sequences.toList( Sequences.map( @@ -442,6 +444,7 @@ public class IncrementalIndexStorageAdapterTest throw new RuntimeException(ex); } + int rowNumInCursor = 0; // and then, cursoring continues in the other thread while (!cursor.isDone()) { IndexedInts row = dimSelector.getRow(); @@ -449,7 +452,10 @@ public class IncrementalIndexStorageAdapterTest Assert.assertTrue(i < cardinality); } cursor.advance(); + rowNumInCursor++; } + Assert.assertEquals(2, rowNumInCursor); + assertCursorsNotEmpty.incrementAndGet(); return null; } @@ -457,5 +463,165 @@ public class IncrementalIndexStorageAdapterTest ), new ArrayList<>() ); + Assert.assertEquals(1, assertCursorsNotEmpty.get()); + } + + @Test + public void testCursoringAndSnapshot() throws Exception + { + final IncrementalIndex index = indexCreator.createIndex(); + final long timestamp = System.currentTimeMillis(); + + for (int i = 0; i < 2; i++) { + index.add( + new MapBasedInputRow( + timestamp, + Lists.newArrayList("billy"), + ImmutableMap.of("billy", "v0" + i) + ) + ); + } + + final StorageAdapter sa = new IncrementalIndexStorageAdapter(index); + + Sequence cursors = sa.makeCursors( + null, new Interval(timestamp - 60_000, timestamp + 60_000), VirtualColumns.EMPTY, Granularities.ALL, false + ); + final AtomicInteger assertCursorsNotEmpty = new AtomicInteger(0); + + Sequences.toList( + Sequences.map( + cursors, + new Function() + { + @Nullable + @Override + public Object apply(Cursor cursor) + { + DimensionSelector dimSelector1A = cursor.makeDimensionSelector( + new DefaultDimensionSpec( + "billy", + "billy" + ) + ); + int cardinalityA = dimSelector1A.getValueCardinality(); + + //index gets more rows at this point, while other thread is iterating over the cursor + try { + index.add( + new MapBasedInputRow( + timestamp, + Lists.newArrayList("billy"), + ImmutableMap.of("billy", "v1") + ) + ); + } + catch (Exception ex) { + throw new RuntimeException(ex); + } + + DimensionSelector dimSelector1B = cursor.makeDimensionSelector( + new DefaultDimensionSpec( + "billy", + "billy" + ) + ); + //index gets more rows at this point, while other thread is iterating over the cursor + try { + index.add( + new MapBasedInputRow( + timestamp, + Lists.newArrayList("billy"), + ImmutableMap.of("billy", "v2") + ) + ); + index.add( + new MapBasedInputRow( + timestamp, + Lists.newArrayList("billy2"), + ImmutableMap.of("billy2", "v3") + ) + ); + } + catch (Exception ex) { + throw new RuntimeException(ex); + } + + DimensionSelector dimSelector1C = cursor.makeDimensionSelector( + new DefaultDimensionSpec( + "billy", + "billy" + ) + ); + + DimensionSelector dimSelector2D = cursor.makeDimensionSelector( + new DefaultDimensionSpec( + "billy2", + "billy2" + ) + ); + //index gets more rows at this point, while other thread is iterating over the cursor + try { + index.add( + new MapBasedInputRow( + timestamp, + Lists.newArrayList("billy"), + ImmutableMap.of("billy", "v3") + ) + ); + index.add( + new MapBasedInputRow( + timestamp, + Lists.newArrayList("billy3"), + ImmutableMap.of("billy3", "") + ) + ); + } + catch (Exception ex) { + throw new RuntimeException(ex); + } + + DimensionSelector dimSelector3E = cursor.makeDimensionSelector( + new DefaultDimensionSpec( + "billy3", + "billy3" + ) + ); + + int rowNumInCursor = 0; + // and then, cursoring continues in the other thread + while (!cursor.isDone()) { + IndexedInts rowA = dimSelector1A.getRow(); + for (int i : rowA) { + Assert.assertTrue(i < cardinalityA); + } + IndexedInts rowB = dimSelector1B.getRow(); + for (int i : rowB) { + Assert.assertTrue(i < cardinalityA); + } + IndexedInts rowC = dimSelector1C.getRow(); + for (int i : rowC) { + Assert.assertTrue(i < cardinalityA); + } + IndexedInts rowD = dimSelector2D.getRow(); + // no null id, so should get empty dims array + Assert.assertEquals(0, rowD.size()); + IndexedInts rowE = dimSelector3E.getRow(); + Assert.assertEquals(1, rowE.size()); + // the null id + Assert.assertEquals(0, rowE.get(0)); + cursor.advance(); + rowNumInCursor++; + } + Assert.assertEquals(2, rowNumInCursor); + assertCursorsNotEmpty.incrementAndGet(); + + return null; + } + } + ), + new ArrayList<>() + ); + Assert.assertEquals(1, assertCursorsNotEmpty.get()); } } diff --git a/processing/src/test/java/io/druid/segment/incremental/OnheapIncrementalIndexBenchmark.java b/processing/src/test/java/io/druid/segment/incremental/OnheapIncrementalIndexBenchmark.java index 3ff81c454d7..d2819d83475 100644 --- a/processing/src/test/java/io/druid/segment/incremental/OnheapIncrementalIndexBenchmark.java +++ b/processing/src/test/java/io/druid/segment/incremental/OnheapIncrementalIndexBenchmark.java @@ -205,6 +205,12 @@ public class OnheapIncrementalIndexBenchmark extends AbstractBenchmark return numEntries.get(); } + + @Override + public int getLastRowIndex() + { + return indexIncrement.get() - 1; + } } @Parameterized.Parameters