mirror of
https://github.com/apache/druid.git
synced 2025-02-17 15:35:56 +00:00
reintroducing the safety check removed in commit-1d602be so that dim value ids are less than cardinality
This commit is contained in:
parent
703dc7a48f
commit
c544ebf25e
@ -60,6 +60,8 @@ import org.joda.time.Interval;
|
|||||||
|
|
||||||
import javax.annotation.Nullable;
|
import javax.annotation.Nullable;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collections;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
@ -361,34 +363,43 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter
|
|||||||
final int[][] dims = currEntry.getKey().getDims();
|
final int[][] dims = currEntry.getKey().getDims();
|
||||||
|
|
||||||
int[] indices = dimIndex < dims.length ? dims[dimIndex] : null;
|
int[] indices = dimIndex < dims.length ? dims[dimIndex] : null;
|
||||||
if (indices == null) {
|
|
||||||
indices = new int[0];
|
List<Integer> valsTmp = null;
|
||||||
}
|
if ((indices == null || indices.length == 0) && dimValLookup.contains(null)) {
|
||||||
// check for null entry
|
int id = dimValLookup.getId(null);
|
||||||
if (indices.length == 0 && dimValLookup.contains(null)) {
|
if (id < maxId) {
|
||||||
indices = new int[] { dimValLookup.getId(null) };
|
valsTmp = new ArrayList<>(1);
|
||||||
|
valsTmp.add(id);
|
||||||
|
}
|
||||||
|
} else if (indices != null && indices.length > 0) {
|
||||||
|
valsTmp = new ArrayList<>(indices.length);
|
||||||
|
for (int i = 0; i < indices.length; i++) {
|
||||||
|
int id = indices[i];
|
||||||
|
if (id < maxId) {
|
||||||
|
valsTmp.add(id);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
final int[] vals = indices;
|
final List<Integer> vals = valsTmp == null ? Collections.EMPTY_LIST : valsTmp;
|
||||||
|
|
||||||
return new IndexedInts()
|
return new IndexedInts()
|
||||||
{
|
{
|
||||||
@Override
|
@Override
|
||||||
public int size()
|
public int size()
|
||||||
{
|
{
|
||||||
return vals.length;
|
return vals.size();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int get(int index)
|
public int get(int index)
|
||||||
{
|
{
|
||||||
return vals[index];
|
return vals.get(index);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Iterator<Integer> iterator()
|
public Iterator<Integer> iterator()
|
||||||
{
|
{
|
||||||
return Ints.asList(vals).iterator();
|
return vals.iterator();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -19,6 +19,7 @@
|
|||||||
|
|
||||||
package io.druid.segment.incremental;
|
package io.druid.segment.incremental;
|
||||||
|
|
||||||
|
import com.google.common.base.Function;
|
||||||
import com.google.common.base.Supplier;
|
import com.google.common.base.Supplier;
|
||||||
import com.google.common.base.Suppliers;
|
import com.google.common.base.Suppliers;
|
||||||
import com.google.common.collect.ImmutableMap;
|
import com.google.common.collect.ImmutableMap;
|
||||||
@ -46,6 +47,8 @@ import io.druid.query.topn.TopNQueryEngine;
|
|||||||
import io.druid.query.topn.TopNResultValue;
|
import io.druid.query.topn.TopNResultValue;
|
||||||
import io.druid.segment.Cursor;
|
import io.druid.segment.Cursor;
|
||||||
import io.druid.segment.DimensionSelector;
|
import io.druid.segment.DimensionSelector;
|
||||||
|
import io.druid.segment.StorageAdapter;
|
||||||
|
import io.druid.segment.data.IndexedInts;
|
||||||
import io.druid.segment.filter.SelectorFilter;
|
import io.druid.segment.filter.SelectorFilter;
|
||||||
import org.joda.time.DateTime;
|
import org.joda.time.DateTime;
|
||||||
import org.joda.time.Interval;
|
import org.joda.time.Interval;
|
||||||
@ -54,6 +57,7 @@ import org.junit.Test;
|
|||||||
import org.junit.runner.RunWith;
|
import org.junit.runner.RunWith;
|
||||||
import org.junit.runners.Parameterized;
|
import org.junit.runners.Parameterized;
|
||||||
|
|
||||||
|
import javax.annotation.Nullable;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
@ -376,4 +380,76 @@ public class IncrementalIndexStorageAdapterTest
|
|||||||
MapBasedRow row = (MapBasedRow) results.get(0);
|
MapBasedRow row = (MapBasedRow) results.get(0);
|
||||||
Assert.assertEquals(ImmutableMap.of("billy", "hi", "cnt", 1L), row.getEvent());
|
Assert.assertEquals(ImmutableMap.of("billy", "hi", "cnt", 1L), row.getEvent());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCursoringAndIndexUpdationInterleaving() throws Exception
|
||||||
|
{
|
||||||
|
final IncrementalIndex index = indexCreator.createIndex();
|
||||||
|
final long timestamp = System.currentTimeMillis();
|
||||||
|
|
||||||
|
for (int i = 0; i < 2; i++) {
|
||||||
|
index.add(
|
||||||
|
new MapBasedInputRow(
|
||||||
|
timestamp,
|
||||||
|
Lists.newArrayList("billy"),
|
||||||
|
ImmutableMap.<String, Object>of("billy", "v1" + i)
|
||||||
|
)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
final StorageAdapter sa = new IncrementalIndexStorageAdapter(index);
|
||||||
|
|
||||||
|
Sequence<Cursor> cursors = sa.makeCursors(
|
||||||
|
null, new Interval(timestamp - 60_000, timestamp + 60_000), QueryGranularity.ALL, false
|
||||||
|
);
|
||||||
|
|
||||||
|
Sequences.toList(
|
||||||
|
Sequences.map(
|
||||||
|
cursors,
|
||||||
|
new Function<Cursor, Object>()
|
||||||
|
{
|
||||||
|
@Nullable
|
||||||
|
@Override
|
||||||
|
public Object apply(Cursor cursor)
|
||||||
|
{
|
||||||
|
DimensionSelector dimSelector = cursor.makeDimensionSelector(
|
||||||
|
new DefaultDimensionSpec(
|
||||||
|
"billy",
|
||||||
|
"billy"
|
||||||
|
)
|
||||||
|
);
|
||||||
|
int cardinality = dimSelector.getValueCardinality();
|
||||||
|
|
||||||
|
//index gets more rows at this point, while other thread is iterating over the cursor
|
||||||
|
try {
|
||||||
|
for (int i = 0; i < 1; i++) {
|
||||||
|
index.add(
|
||||||
|
new MapBasedInputRow(
|
||||||
|
timestamp,
|
||||||
|
Lists.newArrayList("billy"),
|
||||||
|
ImmutableMap.<String, Object>of("billy", "v2" + i)
|
||||||
|
)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
catch (Exception ex) {
|
||||||
|
throw new RuntimeException(ex);
|
||||||
|
}
|
||||||
|
|
||||||
|
// and then, cursoring continues in the other thread
|
||||||
|
while (!cursor.isDone()) {
|
||||||
|
IndexedInts row = dimSelector.getRow();
|
||||||
|
for (int i : row) {
|
||||||
|
Assert.assertTrue(i < cardinality);
|
||||||
|
}
|
||||||
|
cursor.advance();
|
||||||
|
}
|
||||||
|
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
),
|
||||||
|
new ArrayList<>()
|
||||||
|
);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user