diff --git a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexStorageAdapter.java b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexStorageAdapter.java index aecf8b6764b..3e736a625a2 100644 --- a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexStorageAdapter.java +++ b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexStorageAdapter.java @@ -60,6 +60,8 @@ import org.joda.time.Interval; import javax.annotation.Nullable; import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -361,34 +363,43 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter final int[][] dims = currEntry.getKey().getDims(); int[] indices = dimIndex < dims.length ? dims[dimIndex] : null; - if (indices == null) { - indices = new int[0]; - } - // check for null entry - if (indices.length == 0 && dimValLookup.contains(null)) { - indices = new int[] { dimValLookup.getId(null) }; + + List valsTmp = null; + if ((indices == null || indices.length == 0) && dimValLookup.contains(null)) { + int id = dimValLookup.getId(null); + if (id < maxId) { + valsTmp = new ArrayList<>(1); + valsTmp.add(id); + } + } else if (indices != null && indices.length > 0) { + valsTmp = new ArrayList<>(indices.length); + for (int i = 0; i < indices.length; i++) { + int id = indices[i]; + if (id < maxId) { + valsTmp.add(id); + } + } } - final int[] vals = indices; - + final List vals = valsTmp == null ? Collections.EMPTY_LIST : valsTmp; return new IndexedInts() { @Override public int size() { - return vals.length; + return vals.size(); } @Override public int get(int index) { - return vals[index]; + return vals.get(index); } @Override public Iterator iterator() { - return Ints.asList(vals).iterator(); + return vals.iterator(); } @Override diff --git a/processing/src/test/java/io/druid/segment/incremental/IncrementalIndexStorageAdapterTest.java b/processing/src/test/java/io/druid/segment/incremental/IncrementalIndexStorageAdapterTest.java index 31130d83bf3..4eb5b510177 100644 --- a/processing/src/test/java/io/druid/segment/incremental/IncrementalIndexStorageAdapterTest.java +++ b/processing/src/test/java/io/druid/segment/incremental/IncrementalIndexStorageAdapterTest.java @@ -19,6 +19,7 @@ package io.druid.segment.incremental; +import com.google.common.base.Function; import com.google.common.base.Supplier; import com.google.common.base.Suppliers; import com.google.common.collect.ImmutableMap; @@ -46,6 +47,8 @@ import io.druid.query.topn.TopNQueryEngine; import io.druid.query.topn.TopNResultValue; import io.druid.segment.Cursor; import io.druid.segment.DimensionSelector; +import io.druid.segment.StorageAdapter; +import io.druid.segment.data.IndexedInts; import io.druid.segment.filter.SelectorFilter; import org.joda.time.DateTime; import org.joda.time.Interval; @@ -54,6 +57,7 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; +import javax.annotation.Nullable; import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -376,4 +380,76 @@ public class IncrementalIndexStorageAdapterTest MapBasedRow row = (MapBasedRow) results.get(0); Assert.assertEquals(ImmutableMap.of("billy", "hi", "cnt", 1L), row.getEvent()); } + + @Test + public void testCursoringAndIndexUpdationInterleaving() throws Exception + { + final IncrementalIndex index = indexCreator.createIndex(); + final long timestamp = System.currentTimeMillis(); + + for (int i = 0; i < 2; i++) { + index.add( + new MapBasedInputRow( + timestamp, + Lists.newArrayList("billy"), + ImmutableMap.of("billy", "v1" + i) + ) + ); + } + + final StorageAdapter sa = new IncrementalIndexStorageAdapter(index); + + Sequence cursors = sa.makeCursors( + null, new Interval(timestamp - 60_000, timestamp + 60_000), QueryGranularity.ALL, false + ); + + Sequences.toList( + Sequences.map( + cursors, + new Function() + { + @Nullable + @Override + public Object apply(Cursor cursor) + { + DimensionSelector dimSelector = cursor.makeDimensionSelector( + new DefaultDimensionSpec( + "billy", + "billy" + ) + ); + int cardinality = dimSelector.getValueCardinality(); + + //index gets more rows at this point, while other thread is iterating over the cursor + try { + for (int i = 0; i < 1; i++) { + index.add( + new MapBasedInputRow( + timestamp, + Lists.newArrayList("billy"), + ImmutableMap.of("billy", "v2" + i) + ) + ); + } + } + catch (Exception ex) { + throw new RuntimeException(ex); + } + + // and then, cursoring continues in the other thread + while (!cursor.isDone()) { + IndexedInts row = dimSelector.getRow(); + for (int i : row) { + Assert.assertTrue(i < cardinality); + } + cursor.advance(); + } + + return null; + } + } + ), + new ArrayList<>() + ); + } }