mirror of https://github.com/apache/druid.git
Add test and fix one more issue caught by test
This commit is contained in:
parent
eb7bc28f41
commit
4f06dd5a7e
|
@ -152,14 +152,14 @@ public class IncrementalIndex implements Iterable<Row>
|
|||
}
|
||||
|
||||
final List<String> rowDimensions = row.getDimensions();
|
||||
String[][] dims = new String[dimensionOrder.size()][];
|
||||
|
||||
String[][] dims;
|
||||
List<String[]> overflow = null;
|
||||
synchronized (dimensionOrder) {
|
||||
dims = new String[dimensionOrder.size()][];
|
||||
for (String dimension : rowDimensions) {
|
||||
dimension = dimension.toLowerCase();
|
||||
List<String> dimensionValues = row.getDimension(dimension);
|
||||
|
||||
synchronized (dimensionOrder) {
|
||||
Integer index = dimensionOrder.get(dimension);
|
||||
if (index == null) {
|
||||
dimensionOrder.put(dimension, dimensionOrder.size());
|
||||
|
@ -175,6 +175,7 @@ public class IncrementalIndex implements Iterable<Row>
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
if (overflow != null) {
|
||||
// Merge overflow and non-overflow
|
||||
String[][] newDims = new String[dims.length + overflow.size()][];
|
||||
|
@ -287,9 +288,10 @@ public class IncrementalIndex implements Iterable<Row>
|
|||
Aggregator[] prev = facts.putIfAbsent(key, aggs);
|
||||
if (prev != null) {
|
||||
aggs = prev;
|
||||
}
|
||||
} else {
|
||||
numEntries.incrementAndGet();
|
||||
}
|
||||
}
|
||||
|
||||
synchronized (this) {
|
||||
in = row;
|
||||
|
|
|
@ -24,18 +24,59 @@ import io.druid.data.input.MapBasedInputRow;
|
|||
import io.druid.data.input.Row;
|
||||
import io.druid.granularity.QueryGranularity;
|
||||
import io.druid.query.aggregation.AggregatorFactory;
|
||||
import io.druid.query.aggregation.CountAggregatorFactory;
|
||||
import io.druid.segment.incremental.IncrementalIndex;
|
||||
import junit.framework.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class IncrementalIndexTest
|
||||
{
|
||||
|
||||
public static IncrementalIndex createCaseInsensitiveIndex(long timestamp)
|
||||
{
|
||||
IncrementalIndex index = new IncrementalIndex(0L, QueryGranularity.NONE, new AggregatorFactory[]{});
|
||||
|
||||
index.add(
|
||||
new MapBasedInputRow(
|
||||
timestamp,
|
||||
Arrays.asList("Dim1", "DiM2"),
|
||||
ImmutableMap.<String, Object>of("dim1", "1", "dim2", "2", "DIM1", "3", "dIM2", "4")
|
||||
)
|
||||
);
|
||||
|
||||
index.add(
|
||||
new MapBasedInputRow(
|
||||
timestamp,
|
||||
Arrays.asList("diM1", "dIM2"),
|
||||
ImmutableMap.<String, Object>of("Dim1", "1", "DiM2", "2", "dim1", "3", "dim2", "4")
|
||||
)
|
||||
);
|
||||
return index;
|
||||
}
|
||||
|
||||
public static MapBasedInputRow getRow(long timestamp, int rowID, int dimensionCount)
|
||||
{
|
||||
List<String> dimensionList = new ArrayList<String>(dimensionCount);
|
||||
ImmutableMap.Builder<String, Object> builder = ImmutableMap.builder();
|
||||
for (int i = 0; i < dimensionCount; i++) {
|
||||
String dimName = String.format("Dim_%d", i);
|
||||
dimensionList.add(dimName);
|
||||
builder.put(dimName, dimName + rowID);
|
||||
}
|
||||
return new MapBasedInputRow(timestamp, dimensionList, builder.build());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCaseInsensitivity() throws Exception
|
||||
{
|
||||
|
@ -58,25 +99,52 @@ public class IncrementalIndexTest
|
|||
Assert.assertEquals(Arrays.asList("4"), row.getDimension("dim2"));
|
||||
}
|
||||
|
||||
public static IncrementalIndex createCaseInsensitiveIndex(long timestamp)
|
||||
@Test
|
||||
public void testConcurrentAdd() throws Exception
|
||||
{
|
||||
IncrementalIndex index = new IncrementalIndex(0L, QueryGranularity.NONE, new AggregatorFactory[]{});
|
||||
|
||||
index.add(
|
||||
new MapBasedInputRow(
|
||||
timestamp,
|
||||
Arrays.asList("Dim1", "DiM2"),
|
||||
ImmutableMap.<String, Object>of("dim1", "1", "dim2", "2", "DIM1", "3", "dIM2", "4")
|
||||
)
|
||||
final IncrementalIndex index = new IncrementalIndex(
|
||||
0L,
|
||||
QueryGranularity.NONE,
|
||||
new AggregatorFactory[]{new CountAggregatorFactory("count")}
|
||||
);
|
||||
|
||||
index.add(
|
||||
new MapBasedInputRow(
|
||||
timestamp,
|
||||
Arrays.asList("diM1", "dIM2"),
|
||||
ImmutableMap.<String, Object>of("Dim1", "1", "DiM2", "2", "dim1", "3", "dim2", "4")
|
||||
)
|
||||
);
|
||||
return index;
|
||||
final int threadCount = 10;
|
||||
final int elementsPerThread = 200;
|
||||
final int dimensionCount = 5;
|
||||
ExecutorService executor = Executors.newFixedThreadPool(threadCount);
|
||||
final long timestamp = System.currentTimeMillis();
|
||||
final CountDownLatch latch = new CountDownLatch(threadCount);
|
||||
for (int j = 0; j < threadCount; j++) {
|
||||
executor.submit(
|
||||
new Runnable()
|
||||
{
|
||||
@Override
|
||||
public void run()
|
||||
{
|
||||
try {
|
||||
for (int i = 0; i < elementsPerThread; i++) {
|
||||
index.add(getRow(timestamp + i, i, dimensionCount));
|
||||
}
|
||||
}
|
||||
catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
latch.countDown();
|
||||
}
|
||||
}
|
||||
);
|
||||
}
|
||||
Assert.assertTrue(latch.await(60, TimeUnit.SECONDS));
|
||||
|
||||
Assert.assertEquals(dimensionCount, index.getDimensions().size());
|
||||
Assert.assertEquals(elementsPerThread, index.size());
|
||||
Iterator<Row> iterator = index.iterator();
|
||||
int curr = 0;
|
||||
while (iterator.hasNext()) {
|
||||
Row row = iterator.next();
|
||||
Assert.assertEquals(timestamp + curr, row.getTimestampFromEpoch());
|
||||
Assert.assertEquals(Float.valueOf(threadCount), row.getFloatMetric("count"));
|
||||
curr++;
|
||||
}
|
||||
Assert.assertEquals(elementsPerThread, curr);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue