Merge pull request #414 from metamx/incremental-index-synchronization-fix

Add test and fix one more issue caught by test
This commit is contained in:
fjy 2014-03-04 15:41:26 -07:00
commit 5aba6141d8
2 changed files with 94 additions and 24 deletions

View File

@ -152,14 +152,14 @@ public class IncrementalIndex implements Iterable<Row>
}
final List<String> rowDimensions = row.getDimensions();
String[][] dims = new String[dimensionOrder.size()][];
String[][] dims;
List<String[]> overflow = null;
for (String dimension : rowDimensions) {
dimension = dimension.toLowerCase();
List<String> dimensionValues = row.getDimension(dimension);
synchronized (dimensionOrder) {
synchronized (dimensionOrder) {
dims = new String[dimensionOrder.size()][];
for (String dimension : rowDimensions) {
dimension = dimension.toLowerCase();
List<String> dimensionValues = row.getDimension(dimension);
Integer index = dimensionOrder.get(dimension);
if (index == null) {
dimensionOrder.put(dimension, dimensionOrder.size());
@ -175,6 +175,7 @@ public class IncrementalIndex implements Iterable<Row>
}
}
if (overflow != null) {
// Merge overflow and non-overflow
String[][] newDims = new String[dims.length + overflow.size()][];
@ -287,8 +288,9 @@ public class IncrementalIndex implements Iterable<Row>
Aggregator[] prev = facts.putIfAbsent(key, aggs);
if (prev != null) {
aggs = prev;
} else {
numEntries.incrementAndGet();
}
numEntries.incrementAndGet();
}
synchronized (this) {

View File

@ -24,18 +24,59 @@ import io.druid.data.input.MapBasedInputRow;
import io.druid.data.input.Row;
import io.druid.granularity.QueryGranularity;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory;
import io.druid.segment.incremental.IncrementalIndex;
import junit.framework.Assert;
import org.junit.Test;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
*/
public class IncrementalIndexTest
{
public static IncrementalIndex createCaseInsensitiveIndex(long timestamp)
{
IncrementalIndex index = new IncrementalIndex(0L, QueryGranularity.NONE, new AggregatorFactory[]{});
index.add(
new MapBasedInputRow(
timestamp,
Arrays.asList("Dim1", "DiM2"),
ImmutableMap.<String, Object>of("dim1", "1", "dim2", "2", "DIM1", "3", "dIM2", "4")
)
);
index.add(
new MapBasedInputRow(
timestamp,
Arrays.asList("diM1", "dIM2"),
ImmutableMap.<String, Object>of("Dim1", "1", "DiM2", "2", "dim1", "3", "dim2", "4")
)
);
return index;
}
public static MapBasedInputRow getRow(long timestamp, int rowID, int dimensionCount)
{
List<String> dimensionList = new ArrayList<String>(dimensionCount);
ImmutableMap.Builder<String, Object> builder = ImmutableMap.builder();
for (int i = 0; i < dimensionCount; i++) {
String dimName = String.format("Dim_%d", i);
dimensionList.add(dimName);
builder.put(dimName, dimName + rowID);
}
return new MapBasedInputRow(timestamp, dimensionList, builder.build());
}
@Test
public void testCaseInsensitivity() throws Exception
{
@ -58,25 +99,52 @@ public class IncrementalIndexTest
Assert.assertEquals(Arrays.asList("4"), row.getDimension("dim2"));
}
public static IncrementalIndex createCaseInsensitiveIndex(long timestamp)
@Test
public void testConcurrentAdd() throws Exception
{
IncrementalIndex index = new IncrementalIndex(0L, QueryGranularity.NONE, new AggregatorFactory[]{});
index.add(
new MapBasedInputRow(
timestamp,
Arrays.asList("Dim1", "DiM2"),
ImmutableMap.<String, Object>of("dim1", "1", "dim2", "2", "DIM1", "3", "dIM2", "4")
)
final IncrementalIndex index = new IncrementalIndex(
0L,
QueryGranularity.NONE,
new AggregatorFactory[]{new CountAggregatorFactory("count")}
);
final int threadCount = 10;
final int elementsPerThread = 200;
final int dimensionCount = 5;
ExecutorService executor = Executors.newFixedThreadPool(threadCount);
final long timestamp = System.currentTimeMillis();
final CountDownLatch latch = new CountDownLatch(threadCount);
for (int j = 0; j < threadCount; j++) {
executor.submit(
new Runnable()
{
@Override
public void run()
{
try {
for (int i = 0; i < elementsPerThread; i++) {
index.add(getRow(timestamp + i, i, dimensionCount));
}
}
catch (Exception e) {
e.printStackTrace();
}
latch.countDown();
}
}
);
}
Assert.assertTrue(latch.await(60, TimeUnit.SECONDS));
index.add(
new MapBasedInputRow(
timestamp,
Arrays.asList("diM1", "dIM2"),
ImmutableMap.<String, Object>of("Dim1", "1", "DiM2", "2", "dim1", "3", "dim2", "4")
)
);
return index;
Assert.assertEquals(dimensionCount, index.getDimensions().size());
Assert.assertEquals(elementsPerThread, index.size());
Iterator<Row> iterator = index.iterator();
int curr = 0;
while (iterator.hasNext()) {
Row row = iterator.next();
Assert.assertEquals(timestamp + curr, row.getTimestampFromEpoch());
Assert.assertEquals(Float.valueOf(threadCount), row.getFloatMetric("count"));
curr++;
}
Assert.assertEquals(elementsPerThread, curr);
}
}