fix #2991: race condition in OnheapIncrementalIndex#addToFacts (#3002)

* fix #2991: race condition in OnheapIncrementalIndex#addToFacts

* add missing header

* handle parseExceptions when first doing first agg
This commit is contained in:
Kurt Young 2016-05-26 10:05:46 +08:00 committed by Fangjin Yang
parent b72c54c4f8
commit b5bd406597
2 changed files with 128 additions and 13 deletions

View File

@ -170,20 +170,13 @@ public class OnheapIncrementalIndex extends IncrementalIndex<Aggregator>
if (null != priorIndex) { if (null != priorIndex) {
aggs = concurrentGet(priorIndex); aggs = concurrentGet(priorIndex);
doAggregate(aggs, rowContainer, row, reportParseExceptions);
} else { } else {
aggs = new Aggregator[metrics.length]; aggs = new Aggregator[metrics.length];
factorizeAggs(metrics, aggs, rowContainer, row);
rowContainer.set(row); doAggregate(aggs, rowContainer, row, reportParseExceptions);
for (int i = 0; i < metrics.length; i++) {
final AggregatorFactory agg = metrics[i];
aggs[i] = agg.factorize(
selectors.get(agg.getName())
);
}
rowContainer.set(null);
final Integer rowIndex = indexIncrement.getAndIncrement(); final Integer rowIndex = indexIncrement.getAndIncrement();
concurrentSet(rowIndex, aggs); concurrentSet(rowIndex, aggs);
// Last ditch sanity checks // Last ditch sanity checks
@ -196,12 +189,38 @@ public class OnheapIncrementalIndex extends IncrementalIndex<Aggregator>
} else { } else {
// We lost a race // We lost a race
aggs = concurrentGet(prev); aggs = concurrentGet(prev);
doAggregate(aggs, rowContainer, row, reportParseExceptions);
// Free up the misfire // Free up the misfire
concurrentRemove(rowIndex); concurrentRemove(rowIndex);
// This is expected to occur ~80% of the time in the worst scenarios // This is expected to occur ~80% of the time in the worst scenarios
} }
} }
return numEntries.get();
}
private void factorizeAggs(
AggregatorFactory[] metrics,
Aggregator[] aggs,
ThreadLocal<InputRow> rowContainer,
InputRow row
)
{
rowContainer.set(row);
for (int i = 0; i < metrics.length; i++) {
final AggregatorFactory agg = metrics[i];
aggs[i] = agg.factorize(selectors.get(agg.getName()));
}
rowContainer.set(null);
}
private void doAggregate(
Aggregator[] aggs,
ThreadLocal<InputRow> rowContainer,
InputRow row,
boolean reportParseExceptions
)
{
rowContainer.set(row); rowContainer.set(row);
for (Aggregator agg : aggs) { for (Aggregator agg : aggs) {
@ -221,9 +240,6 @@ public class OnheapIncrementalIndex extends IncrementalIndex<Aggregator>
} }
rowContainer.set(null); rowContainer.set(null);
return numEntries.get();
} }
protected Aggregator[] concurrentGet(int offset) protected Aggregator[] concurrentGet(int offset)

View File

@ -0,0 +1,99 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.segment.incremental;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import io.druid.data.input.MapBasedInputRow;
import io.druid.granularity.QueryGranularities;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.LongMaxAggregatorFactory;
import org.junit.Assert;
import org.junit.Test;
import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger;
public class OnheapIncrementalIndexTest
{
private static final int MAX_ROWS = 100000;
@Test
public void testMultithreadAddFacts() throws Exception
{
final OnheapIncrementalIndex index = new OnheapIncrementalIndex(
0,
QueryGranularities.MINUTE,
new AggregatorFactory[]{new LongMaxAggregatorFactory("max", "max")},
MAX_ROWS
);
final Random random = new Random();
final int addThreadCount = 2;
Thread[] addThreads = new Thread[addThreadCount];
for (int i = 0; i < addThreadCount; ++i) {
addThreads[i] = new Thread(new Runnable()
{
@Override
public void run()
{
try {
for (int j = 0; j < MAX_ROWS / addThreadCount; ++j) {
index.add(new MapBasedInputRow(
0,
Lists.newArrayList("billy"),
ImmutableMap.<String, Object>of("billy", random.nextLong(), "max", 1)
));
}
}
catch (Exception e) {
throw new RuntimeException(e);
}
}
});
addThreads[i].start();
}
final AtomicInteger checkFailedCount = new AtomicInteger(0);
Thread checkThread = new Thread(new Runnable()
{
@Override
public void run()
{
while (!Thread.interrupted()) {
for (int row : index.getFacts().values()) {
if (index.getMetricLongValue(row, 0) != 1) {
checkFailedCount.addAndGet(1);
}
}
}
}
});
checkThread.start();
for (int i = 0; i < addThreadCount; ++i) {
addThreads[i].join();
}
checkThread.interrupt();
Assert.assertEquals(0, checkFailedCount.get());
}
}