IncrementalIndex throws exception if limit exceeded

- For now uses a hardcoded ratio of aggregator to timeanddim buffer sizes
- canAppendRow is a workaround for realtime index since the
Firehose currently does not have a way of rolling back the last event in
case of error
- canAppendRow needs a fudge factor; there is a race between checking
if we can add a row and actually adding a row, because of the way MapDB
reports its size.
This commit is contained in:
Xavier Léauté 2014-12-04 10:52:44 -08:00
parent 18234a2f00
commit 7cd45a6e1f
20 changed files with 230 additions and 79 deletions

View File

@ -40,6 +40,8 @@ public class HadoopTuningConfig implements TuningConfig
private static final Map<DateTime, List<HadoopyShardSpec>> DEFAULT_SHARD_SPECS = ImmutableMap.<DateTime, List<HadoopyShardSpec>>of();
private static final int DEFAULT_ROW_FLUSH_BOUNDARY = 80000;
private static final int DEFAULT_BUFFER_SIZE = 128 * 1024 * 1024;
private static final float DEFAULT_AGG_BUFFER_RATIO = 0.5f;
public static HadoopTuningConfig makeDefaultTuningConfig()
{
@ -57,7 +59,8 @@ public class HadoopTuningConfig implements TuningConfig
false,
false,
false,
DEFAULT_BUFFER_SIZE
DEFAULT_BUFFER_SIZE,
DEFAULT_AGG_BUFFER_RATIO
);
}
@ -75,6 +78,7 @@ public class HadoopTuningConfig implements TuningConfig
private final boolean persistInHeap;
private final boolean ingestOffheap;
private final int bufferSize;
private final float aggregationBufferRatio;
@JsonCreator
public HadoopTuningConfig(
@ -91,7 +95,8 @@ public class HadoopTuningConfig implements TuningConfig
final @JsonProperty("combineText") boolean combineText,
final @JsonProperty("persistInHeap") boolean persistInHeap,
final @JsonProperty("ingestOffheap") boolean ingestOffheap,
final @JsonProperty("bufferSize") Integer bufferSize
final @JsonProperty("bufferSize") Integer bufferSize,
final @JsonProperty("aggregationBufferRatio") Float aggregationBufferRatio
)
{
this.workingPath = workingPath == null ? null : workingPath;
@ -110,6 +115,7 @@ public class HadoopTuningConfig implements TuningConfig
this.persistInHeap = persistInHeap;
this.ingestOffheap = ingestOffheap;
this.bufferSize = bufferSize == null ? DEFAULT_BUFFER_SIZE : bufferSize;
this.aggregationBufferRatio = aggregationBufferRatio == null ? DEFAULT_AGG_BUFFER_RATIO : aggregationBufferRatio;
}
@JsonProperty
@ -194,6 +200,12 @@ public class HadoopTuningConfig implements TuningConfig
return bufferSize;
}
@JsonProperty
public float getAggregationBufferRatio()
{
return aggregationBufferRatio;
}
public HadoopTuningConfig withWorkingPath(String path)
{
return new HadoopTuningConfig(
@ -210,7 +222,8 @@ public class HadoopTuningConfig implements TuningConfig
combineText,
persistInHeap,
ingestOffheap,
bufferSize
bufferSize,
aggregationBufferRatio
);
}
@ -230,7 +243,8 @@ public class HadoopTuningConfig implements TuningConfig
combineText,
persistInHeap,
ingestOffheap,
bufferSize
bufferSize,
aggregationBufferRatio
);
}
@ -250,7 +264,8 @@ public class HadoopTuningConfig implements TuningConfig
combineText,
persistInHeap,
ingestOffheap,
bufferSize
bufferSize,
aggregationBufferRatio
);
}
}

View File

@ -348,7 +348,8 @@ public class IndexGeneratorJob implements Jobby
int numRows = index.add(inputRow);
++lineCount;
if (index.isFull()) {
if (!index.canAppendRow()) {
log.info(index.getOutOfRowsReason());
log.info(
"%,d lines to %,d rows in %,d millis",
lineCount - runningTotalLineCount,
@ -639,11 +640,14 @@ public class IndexGeneratorJob implements Jobby
.withMetrics(aggs)
.build();
if (tuningConfig.isIngestOffheap()) {
final int maxTotalBufferSize = tuningConfig.getBufferSize();
final int aggregationBufferSize = (int) ((double) maxTotalBufferSize
* tuningConfig.getAggregationBufferRatio());
return new OffheapIncrementalIndex(
indexSchema,
new OffheapBufferPool(tuningConfig.getBufferSize()),
new OffheapBufferPool(aggregationBufferSize),
true,
tuningConfig.getBufferSize()
maxTotalBufferSize
);
} else {
return new OnheapIncrementalIndex(

View File

@ -183,6 +183,7 @@ public class HadoopDruidIndexerConfigTest
false,
false,
false,
null,
null
)
);

View File

@ -38,6 +38,7 @@ import io.druid.segment.IndexIO;
import io.druid.segment.IndexMaker;
import io.druid.segment.QueryableIndex;
import io.druid.segment.SegmentUtils;
import io.druid.segment.incremental.IndexSizeExceededException;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.RealtimeTuningConfig;
import io.druid.segment.loading.DataSegmentPusher;
@ -107,7 +108,7 @@ public class YeOldePlumberSchool implements PlumberSchool
}
@Override
public int add(InputRow row)
public int add(InputRow row) throws IndexSizeExceededException
{
Sink sink = getSink(row.getTimestampFromEpoch());
if (sink == null) {

View File

@ -294,7 +294,7 @@ public class RealtimeIndexTask extends AbstractTask
fireDepartment.getMetrics().incrementProcessed();
final Sink sink = plumber.getSink(inputRow.getTimestampFromEpoch());
if ((sink != null && sink.isFull()) || System.currentTimeMillis() > nextFlush) {
if ((sink != null && !sink.canAppendRow()) || System.currentTimeMillis() > nextFlush) {
plumber.persist(firehose.commit());
nextFlush = new DateTime().plus(intermediatePersistPeriod).getMillis();
}

View File

@ -31,6 +31,7 @@ import io.druid.granularity.QueryGranularity;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.dimension.DimensionSpec;
import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.IndexSizeExceededException;
import io.druid.segment.incremental.OffheapIncrementalIndex;
import io.druid.segment.incremental.OnheapIncrementalIndex;
@ -95,7 +96,7 @@ public class GroupByQueryHelper
gran,
aggs.toArray(new AggregatorFactory[aggs.size()]),
false,
Integer.MAX_VALUE
config.getMaxResults()
);
}
@ -106,9 +107,10 @@ public class GroupByQueryHelper
{
if (in instanceof Row) {
if (accumulated.add(Rows.toCaseInsensitiveInputRow((Row) in, dimensions))
> config.getMaxResults()) {
throw new ISE("Computation exceeds maxRows limit[%s]", config.getMaxResults());
try {
accumulated.add(Rows.toCaseInsensitiveInputRow((Row) in, dimensions));
} catch(IndexSizeExceededException e) {
throw new ISE(e.getMessage());
}
} else {
throw new ISE("Unable to accumulate something of type [%s]", in.getClass());

View File

@ -314,6 +314,10 @@ public abstract class IncrementalIndex<AggregatorType> implements Iterable<Row>,
public abstract ConcurrentNavigableMap<TimeAndDims, Integer> getFacts();
public abstract boolean canAppendRow();
public abstract String getOutOfRowsReason();
protected abstract DimDim makeDimDim(String dimension);
protected abstract AggregatorType[] initAggs(
@ -329,7 +333,7 @@ public abstract class IncrementalIndex<AggregatorType> implements Iterable<Row>,
AtomicInteger numEntries,
TimeAndDims key,
ThreadLocal<InputRow> in
);
) throws IndexSizeExceededException;
protected abstract AggregatorType[] getAggsForRow(int rowOffset);
@ -371,7 +375,7 @@ public abstract class IncrementalIndex<AggregatorType> implements Iterable<Row>,
*
* @return the number of rows in the data set after adding the InputRow
*/
public int add(InputRow row)
public int add(InputRow row) throws IndexSizeExceededException
{
row = formatRow(row);
if (row.getTimestampFromEpoch() < minTimestamp) {
@ -720,6 +724,4 @@ public abstract class IncrementalIndex<AggregatorType> implements Iterable<Row>,
) + '}';
}
}
public abstract boolean isFull();
}

View File

@ -0,0 +1,44 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2014 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.segment.incremental;
import java.io.IOException;
public class IndexSizeExceededException extends IOException
{
public IndexSizeExceededException()
{
}
public IndexSizeExceededException(String message)
{
super(message);
}
public IndexSizeExceededException(String message, Throwable cause)
{
super(message, cause);
}
public IndexSizeExceededException(Throwable cause)
{
super(cause);
}
}

View File

@ -19,6 +19,7 @@
package io.druid.segment.incremental;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.metamx.common.ISE;
import io.druid.collections.ResourceHolder;
@ -38,6 +39,7 @@ import java.io.DataOutput;
import java.io.IOException;
import java.io.Serializable;
import java.lang.ref.WeakReference;
import java.lang.reflect.Field;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Comparator;
@ -51,6 +53,22 @@ import java.util.concurrent.atomic.AtomicInteger;
*/
public class OffheapIncrementalIndex extends IncrementalIndex<BufferAggregator>
{
private static final long STORE_CHUNK_SIZE;
static
{
// MapDB allocated memory in chunks. We need to know CHUNK_SIZE
// in order to get a crude estimate of how much more direct memory
// might be used when adding an additional row.
try {
Field field = Store.class.getDeclaredField("CHUNK_SIZE");
field.setAccessible(true);
STORE_CHUNK_SIZE = field.getLong(null);
} catch(NoSuchFieldException | IllegalAccessException e) {
throw new RuntimeException("Unable to determine MapDB store chunk size", e);
}
}
private final ResourceHolder<ByteBuffer> bufferHolder;
private final DB db;
@ -58,18 +76,24 @@ public class OffheapIncrementalIndex extends IncrementalIndex<BufferAggregator>
private final int[] aggPositionOffsets;
private final int totalAggSize;
private final ConcurrentNavigableMap<TimeAndDims, Integer> facts;
private final int sizeLimit;
private final int maxTotalBufferSize;
private String outOfRowsReason = null;
public OffheapIncrementalIndex(
IncrementalIndexSchema incrementalIndexSchema,
StupidPool<ByteBuffer> bufferPool,
boolean deserializeComplexMetrics,
int sizeLimit
int maxTotalBufferSize
)
{
super(incrementalIndexSchema, deserializeComplexMetrics);
this.bufferHolder = bufferPool.take();
Preconditions.checkArgument(
maxTotalBufferSize > bufferHolder.get().limit(),
"Maximum total buffer size must be greater than aggregation buffer size"
);
final AggregatorFactory[] metrics = incrementalIndexSchema.getMetrics();
this.aggPositionOffsets = new int[metrics.length];
@ -94,7 +118,7 @@ public class OffheapIncrementalIndex extends IncrementalIndex<BufferAggregator>
.comparator(timeAndDimsSerializer.getComparator())
.valueSerializer(Serializer.INTEGER)
.make();
this.sizeLimit = sizeLimit;
this.maxTotalBufferSize = maxTotalBufferSize;
}
public OffheapIncrementalIndex(
@ -103,7 +127,7 @@ public class OffheapIncrementalIndex extends IncrementalIndex<BufferAggregator>
final AggregatorFactory[] metrics,
StupidPool<ByteBuffer> bufferPool,
boolean deserializeComplexMetrics,
int sizeLimit
int maxTotalBufferSize
)
{
this(
@ -113,7 +137,7 @@ public class OffheapIncrementalIndex extends IncrementalIndex<BufferAggregator>
.build(),
bufferPool,
deserializeComplexMetrics,
sizeLimit
maxTotalBufferSize
);
}
@ -154,20 +178,21 @@ public class OffheapIncrementalIndex extends IncrementalIndex<BufferAggregator>
AtomicInteger numEntries,
TimeAndDims key,
ThreadLocal<InputRow> in
)
) throws IndexSizeExceededException
{
final BufferAggregator[] aggs = getAggs();
Integer rowOffset;
synchronized (this) {
if (!facts.containsKey(key)) {
if (!canAppendRow(false)) {
throw new IndexSizeExceededException(getOutOfRowsReason());
}
}
rowOffset = totalAggSize * numEntries.get();
final Integer prev = facts.putIfAbsent(key, rowOffset);
if (prev != null) {
rowOffset = prev;
} else {
if (rowOffset + totalAggSize > bufferHolder.get().limit()) {
facts.remove(key);
throw new ISE("Buffer full, cannot add more rows! Current rowSize[%,d].", numEntries.get());
}
numEntries.incrementAndGet();
for (int i = 0; i < aggs.length; i++) {
aggs[i].init(bufferHolder.get(), getMetricPosition(rowOffset, i));
@ -184,6 +209,34 @@ public class OffheapIncrementalIndex extends IncrementalIndex<BufferAggregator>
return numEntries.get();
}
public boolean canAppendRow() {
return canAppendRow(true);
}
private boolean canAppendRow(boolean includeFudgeFactor)
{
// there is a race condition when checking current MapDB
// when canAppendRow() is called after adding a row it may return true, but on a subsequence call
// to addToFacts that may not be the case anymore because MapDB size may have changed.
// so we add this fudge factor, hoping that will be enough.
final int aggBufferSize = bufferHolder.get().limit();
if ((size() + 1) * totalAggSize > aggBufferSize) {
outOfRowsReason = String.format("Maximum aggregation buffer limit reached [%d bytes].", aggBufferSize);
return false;
}
// hopefully both MapDBs will grow by at most STORE_CHUNK_SIZE each when we add the next row.
if (getCurrentSize() + totalAggSize + 2 * STORE_CHUNK_SIZE + (includeFudgeFactor ? STORE_CHUNK_SIZE : 0) > maxTotalBufferSize) {
outOfRowsReason = String.format("Maximum time and dimension buffer limit reached [%d bytes].", maxTotalBufferSize - aggBufferSize);
return false;
}
return true;
}
public String getOutOfRowsReason() {
return outOfRowsReason;
}
@Override
protected BufferAggregator[] getAggsForRow(int rowOffset)
{
@ -219,21 +272,14 @@ public class OffheapIncrementalIndex extends IncrementalIndex<BufferAggregator>
{
try {
bufferHolder.close();
Store.forDB(db).close();
Store.forDB(factsDb).close();
}
catch (IOException e) {
throw Throwables.propagate(e);
}
}
/**
* - * @return true if the underlying buffer for IncrementalIndex is full and cannot accommodate more rows.
* -
*/
public boolean isFull()
{
return getCurrentSize() > sizeLimit;
}
private int getMetricPosition(int rowOffset, int metricIndex)
{
return rowOffset + aggPositionOffsets[metricIndex];
@ -429,6 +475,6 @@ public class OffheapIncrementalIndex extends IncrementalIndex<BufferAggregator>
return Store.forDB(db).getCurrSize() +
Store.forDB(factsDb).getCurrSize()
// Size of aggregators
+ (size() + 1) * totalAggSize;
+ size() * totalAggSize;
}
}

View File

@ -45,6 +45,8 @@ public class OnheapIncrementalIndex extends IncrementalIndex<Aggregator>
private final List<Aggregator[]> aggList = Lists.newArrayList();
private final int maxRowCount;
private String outOfRowsReason = null;
public OnheapIncrementalIndex(IncrementalIndexSchema incrementalIndexSchema, boolean deserializeComplexMetrics, int maxRowCount)
{
super(incrementalIndexSchema, deserializeComplexMetrics);
@ -123,11 +125,14 @@ public class OnheapIncrementalIndex extends IncrementalIndex<Aggregator>
AtomicInteger numEntries,
TimeAndDims key,
ThreadLocal<InputRow> in
)
) throws IndexSizeExceededException
{
Integer rowOffset;
synchronized (this) {
rowOffset = numEntries.get();
if(rowOffset >= maxRowCount && !facts.containsKey(key)) {
throw new IndexSizeExceededException("Maximum number of rows reached");
}
final Integer prev = facts.putIfAbsent(key, rowOffset);
if (prev != null) {
rowOffset = prev;
@ -156,6 +161,22 @@ public class OnheapIncrementalIndex extends IncrementalIndex<Aggregator>
return numEntries.get();
}
@Override
public boolean canAppendRow()
{
final boolean canAdd = size() < maxRowCount;
if(!canAdd) {
outOfRowsReason = String.format("Maximum number of rows [%d] reached", maxRowCount);
}
return canAdd;
}
@Override
public String getOutOfRowsReason()
{
return outOfRowsReason;
}
@Override
protected Aggregator[] getAggsForRow(int rowOffset)
{
@ -174,12 +195,6 @@ public class OnheapIncrementalIndex extends IncrementalIndex<Aggregator>
return aggList.get(rowOffset)[aggOffset].getFloat();
}
@Override
public boolean isFull()
{
return size() >= maxRowCount;
}
@Override
public long getMetricLongValue(int rowOffset, int aggOffset)
{

View File

@ -40,6 +40,7 @@ import io.druid.query.aggregation.DoubleSumAggregatorFactory;
import io.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
import io.druid.query.aggregation.hyperloglog.HyperUniquesSerde;
import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.IndexSizeExceededException;
import io.druid.segment.incremental.OnheapIncrementalIndex;
import io.druid.segment.serde.ComplexMetrics;
import io.druid.timeline.TimelineObjectHolder;
@ -145,7 +146,11 @@ public class SchemalessIndex
}
}
theIndex.add(new MapBasedInputRow(timestamp, dims, event));
try {
theIndex.add(new MapBasedInputRow(timestamp, dims, event));
} catch(IndexSizeExceededException e) {
Throwables.propagate(e);
}
count++;
}

View File

@ -27,9 +27,10 @@ import io.druid.query.TestQueryRunners;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory;
import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.IndexSizeExceededException;
import io.druid.segment.incremental.OffheapIncrementalIndex;
import io.druid.segment.incremental.OnheapIncrementalIndex;
import junit.framework.Assert;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@ -112,7 +113,7 @@ public class IncrementalIndexTest
}
}
public static void populateIndex(long timestamp, IncrementalIndex index)
public static void populateIndex(long timestamp, IncrementalIndex index) throws IndexSizeExceededException
{
index.add(
new MapBasedInputRow(
@ -203,14 +204,14 @@ public class IncrementalIndexTest
while (iterator.hasNext()) {
Row row = iterator.next();
Assert.assertEquals(timestamp + curr, row.getTimestampFromEpoch());
Assert.assertEquals(Float.valueOf(threadCount), row.getFloatMetric("count"));
Assert.assertEquals(Float.valueOf(threadCount), (Float)row.getFloatMetric("count"));
curr++;
}
Assert.assertEquals(elementsPerThread, curr);
}
@Test
public void testOffheapIndexIsFull()
public void testOffheapIndexIsFull() throws IndexSizeExceededException
{
OffheapIncrementalIndex index = new OffheapIncrementalIndex(
0L,
@ -218,12 +219,12 @@ public class IncrementalIndexTest
new AggregatorFactory[]{new CountAggregatorFactory("count")},
TestQueryRunners.pool,
true,
2 * 1024 * 1024
(10 + 2) * 1024 * 1024
);
int rowCount = 0;
for (int i = 0; i < 500; i++) {
rowCount = index.add(getRow(System.currentTimeMillis(), i, 100));
if (index.isFull()) {
if (!index.canAppendRow()) {
break;
}
}

View File

@ -73,6 +73,7 @@ import java.util.Random;
@RunWith(Parameterized.class)
public class SpatialFilterBonusTest
{
public static final int NUM_POINTS = 5000;
private static Interval DATA_INTERVAL = new Interval("2013-01-01/2013-01-07");
private static AggregatorFactory[] METRIC_AGGS = new AggregatorFactory[]{
new CountAggregatorFactory("rows"),
@ -126,7 +127,7 @@ public class SpatialFilterBonusTest
)
).build(),
false,
1000
NUM_POINTS
);
theIndex.add(
new MapBasedInputRow(
@ -203,7 +204,7 @@ public class SpatialFilterBonusTest
// Add a bunch of random points
Random rand = new Random();
for (int i = 5; i < 5000; i++) {
for (int i = 6; i < NUM_POINTS; i++) {
theIndex.add(
new MapBasedInputRow(
new DateTime("2013-01-01").getMillis(),
@ -258,7 +259,7 @@ public class SpatialFilterBonusTest
).build(),
false,
1000
NUM_POINTS
);
IncrementalIndex second = new OnheapIncrementalIndex(
new IncrementalIndexSchema.Builder().withMinTimestamp(DATA_INTERVAL.getStartMillis())
@ -277,7 +278,7 @@ public class SpatialFilterBonusTest
)
).build(),
false,
1000
NUM_POINTS
);
IncrementalIndex third = new OnheapIncrementalIndex(
new IncrementalIndexSchema.Builder().withMinTimestamp(DATA_INTERVAL.getStartMillis())
@ -297,7 +298,7 @@ public class SpatialFilterBonusTest
).build(),
false,
1000
NUM_POINTS
);
@ -376,7 +377,7 @@ public class SpatialFilterBonusTest
// Add a bunch of random points
Random rand = new Random();
for (int i = 5; i < 5000; i++) {
for (int i = 6; i < NUM_POINTS; i++) {
third.add(
new MapBasedInputRow(
new DateTime("2013-01-01").getMillis(),

View File

@ -73,6 +73,7 @@ import java.util.Random;
@RunWith(Parameterized.class)
public class SpatialFilterTest
{
public static final int NUM_POINTS = 5000;
private static Interval DATA_INTERVAL = new Interval("2013-01-01/2013-01-07");
private static AggregatorFactory[] METRIC_AGGS = new AggregatorFactory[]{
@ -127,8 +128,9 @@ public class SpatialFilterTest
)
).build(),
false,
1000
NUM_POINTS
);
theIndex.add(
new MapBasedInputRow(
new DateTime("2013-01-01").getMillis(),
@ -234,7 +236,7 @@ public class SpatialFilterTest
// Add a bunch of random points
Random rand = new Random();
for (int i = 5; i < 5000; i++) {
for (int i = 8; i < NUM_POINTS; i++) {
theIndex.add(
new MapBasedInputRow(
new DateTime("2013-01-01").getMillis(),
@ -338,7 +340,7 @@ public class SpatialFilterTest
)
).build(),
false,
1000
NUM_POINTS
);
@ -447,7 +449,7 @@ public class SpatialFilterTest
// Add a bunch of random points
Random rand = new Random();
for (int i = 5; i < 5000; i++) {
for (int i = 8; i < NUM_POINTS; i++) {
third.add(
new MapBasedInputRow(
new DateTime("2013-01-01").getMillis(),

View File

@ -247,7 +247,7 @@ public class IncrementalIndexStorageAdapterTest
}
@Test
public void testResetSanity() {
public void testResetSanity() throws IOException{
IncrementalIndex index = indexCreator.createIndex();
DateTime t = DateTime.now();
@ -295,7 +295,7 @@ public class IncrementalIndexStorageAdapterTest
}
@Test
public void testSingleValueTopN()
public void testSingleValueTopN() throws IOException
{
IncrementalIndex index = indexCreator.createIndex();
DateTime t = DateTime.now();

View File

@ -40,6 +40,7 @@ import io.druid.query.QueryRunnerFactoryConglomerate;
import io.druid.query.QuerySegmentWalker;
import io.druid.query.QueryToolChest;
import io.druid.query.SegmentDescriptor;
import io.druid.segment.incremental.IndexSizeExceededException;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.RealtimeTuningConfig;
import io.druid.segment.realtime.plumber.Plumber;
@ -197,12 +198,19 @@ public class RealtimeManager implements QuerySegmentWalker
continue;
}
int currCount = plumber.add(inputRow);
if (currCount == -1) {
boolean lateEvent = false;
boolean indexLimitExceeded = false;
try {
lateEvent = plumber.add(inputRow) == -1;
} catch (IndexSizeExceededException e) {
log.info("Index limit exceeded: %s", e.getMessage());
indexLimitExceeded = true;
}
if (indexLimitExceeded || lateEvent) {
metrics.incrementThrownAway();
log.debug("Throwing away event[%s]", inputRow);
if (System.currentTimeMillis() > nextFlush) {
if (indexLimitExceeded || System.currentTimeMillis() > nextFlush) {
plumber.persist(firehose.commit());
nextFlush = new DateTime().plus(intermediatePersistPeriod).getMillis();
}
@ -210,7 +218,7 @@ public class RealtimeManager implements QuerySegmentWalker
continue;
}
final Sink sink = plumber.getSink(inputRow.getTimestampFromEpoch());
if ((sink != null && sink.isFull()) || System.currentTimeMillis() > nextFlush) {
if ((sink != null && !sink.canAppendRow()) || System.currentTimeMillis() > nextFlush) {
plumber.persist(firehose.commit());
nextFlush = new DateTime().plus(intermediatePersistPeriod).getMillis();
}

View File

@ -22,6 +22,7 @@ package io.druid.segment.realtime.plumber;
import io.druid.data.input.InputRow;
import io.druid.query.Query;
import io.druid.query.QueryRunner;
import io.druid.segment.incremental.IndexSizeExceededException;
public interface Plumber
{
@ -36,7 +37,7 @@ public interface Plumber
* @return - positive numbers indicate how many summarized rows exist in the index for that timestamp,
* -1 means a row was thrown away because it was too late
*/
public int add(InputRow row);
public int add(InputRow row) throws IndexSizeExceededException;
public <T> QueryRunner<T> getQueryRunner(Query<T> query);
/**

View File

@ -38,6 +38,7 @@ import io.druid.segment.IndexMerger;
import io.druid.segment.QueryableIndex;
import io.druid.segment.QueryableIndexSegment;
import io.druid.segment.Segment;
import io.druid.segment.incremental.IndexSizeExceededException;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.RealtimeTuningConfig;
import io.druid.segment.loading.DataSegmentPusher;
@ -158,7 +159,7 @@ public class RealtimePlumber implements Plumber
}
@Override
public int add(InputRow row)
public int add(InputRow row) throws IndexSizeExceededException
{
final Sink sink = getSink(row.getTimestampFromEpoch());
if (sink == null) {

View File

@ -31,6 +31,7 @@ import io.druid.offheap.OffheapBufferPool;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.IncrementalIndexSchema;
import io.druid.segment.incremental.IndexSizeExceededException;
import io.druid.segment.incremental.OffheapIncrementalIndex;
import io.druid.segment.incremental.OnheapIncrementalIndex;
import io.druid.segment.indexing.DataSchema;
@ -112,7 +113,7 @@ public class Sink implements Iterable<FireHydrant>
return currHydrant;
}
public int add(InputRow row)
public int add(InputRow row) throws IndexSizeExceededException
{
if (currHydrant == null) {
throw new IAE("No currHydrant but given row[%s]", row);
@ -127,6 +128,13 @@ public class Sink implements Iterable<FireHydrant>
}
}
public boolean canAppendRow()
{
synchronized (currHydrant) {
return currHydrant != null && currHydrant.getIndex().canAppendRow();
}
}
public boolean isEmpty()
{
synchronized (hydrantLock) {
@ -234,11 +242,4 @@ public class Sink implements Iterable<FireHydrant>
", schema=" + schema +
'}';
}
public boolean isFull()
{
synchronized (currHydrant){
return currHydrant != null && currHydrant.getIndex().isFull();
}
}
}

View File

@ -34,6 +34,7 @@ import io.druid.query.Query;
import io.druid.query.QueryRunner;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory;
import io.druid.segment.incremental.IndexSizeExceededException;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.RealtimeIOConfig;
import io.druid.segment.indexing.RealtimeTuningConfig;
@ -267,7 +268,7 @@ public class RealtimeManagerTest
}
@Override
public int add(InputRow row)
public int add(InputRow row) throws IndexSizeExceededException
{
if (row == null) {
return -1;