diff --git a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java index 5427d1173a3..c19ad780337 100644 --- a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java +++ b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java @@ -442,6 +442,17 @@ public abstract class IncrementalIndex implements Iterable, overflow = Lists.newArrayList(); } overflow.add(getDimVals(dimValues.add(dimension), dimensionValues)); + } else if (index > dims.length || dims[index] != null) { + /* + * index > dims.length requires that we saw this dimension and added it to the dimensionOrder map, + * otherwise index is null. Since dims is initialized based on the size of dimensionOrder on each call to add, + * it must have been added to dimensionOrder during this InputRow. + * + * if we found an index for this dimension it means we've seen it already. If !(index > dims.length) then + * we saw it on a previous input row (this its safe to index into dims). If we found a value in + * the dims array for this index, it means we have seen this dimension already on this input row. + */ + throw new ISE("Dimension[%s] occurred more than once in InputRow", dimension); } else { dims[index] = getDimVals(dimValues.get(dimension), dimensionValues); } diff --git a/processing/src/test/java/io/druid/segment/incremental/IncrementalIndexTest.java b/processing/src/test/java/io/druid/segment/incremental/IncrementalIndexTest.java new file mode 100644 index 00000000000..5c182d41f58 --- /dev/null +++ b/processing/src/test/java/io/druid/segment/incremental/IncrementalIndexTest.java @@ -0,0 +1,147 @@ +/* + * Druid - a distributed column store. + * Copyright 2012 - 2015 Metamarkets Group Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.druid.segment.incremental; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import com.metamx.common.ISE; +import io.druid.data.input.MapBasedInputRow; +import io.druid.granularity.QueryGranularity; +import io.druid.query.TestQueryRunners; +import io.druid.query.aggregation.AggregatorFactory; +import io.druid.query.aggregation.CountAggregatorFactory; +import org.joda.time.DateTime; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; + +/** + */ +@RunWith(Parameterized.class) +public class IncrementalIndexTest +{ + + interface IndexCreator + { + IncrementalIndex createIndex(); + } + + private final IndexCreator indexCreator; + + public IncrementalIndexTest(IndexCreator IndexCreator) + { + this.indexCreator = IndexCreator; + } + + @Parameterized.Parameters + public static Collection constructorFeeder() throws IOException + { + return Arrays.asList( + new Object[][]{ + { + new IndexCreator() + { + @Override + public IncrementalIndex createIndex() + { + return new OnheapIncrementalIndex( + 0, QueryGranularity.MINUTE, new AggregatorFactory[]{new CountAggregatorFactory("cnt")}, 1000 + ); + } + } + + }, + { + new IndexCreator() + { + @Override + public IncrementalIndex createIndex() + { + return new OffheapIncrementalIndex( + 0, + QueryGranularity.MINUTE, + new AggregatorFactory[]{new CountAggregatorFactory("cnt")}, + TestQueryRunners.pool, + true, + 100 * 1024 * 1024 + ); + } + } + } + + } + ); + } + + @Test(expected = ISE.class) + public void testDuplicateDimensions() throws IndexSizeExceededException + { + IncrementalIndex index = indexCreator.createIndex(); + index.add( + new MapBasedInputRow( + new DateTime().minus(1).getMillis(), + Lists.newArrayList("billy", "joe"), + ImmutableMap.of("billy", "A", "joe", "B") + ) + ); + index.add( + new MapBasedInputRow( + new DateTime().minus(1).getMillis(), + Lists.newArrayList("billy", "joe", "joe"), + ImmutableMap.of("billy", "A", "joe", "B") + ) + ); + } + + @Test(expected = ISE.class) + public void testDuplicateDimensionsFirstOccurance() throws IndexSizeExceededException + { + IncrementalIndex index = indexCreator.createIndex(); + index.add( + new MapBasedInputRow( + new DateTime().minus(1).getMillis(), + Lists.newArrayList("billy", "joe", "joe"), + ImmutableMap.of("billy", "A", "joe", "B") + ) + ); + } + + @Test + public void controlTest() throws IndexSizeExceededException + { + IncrementalIndex index = indexCreator.createIndex(); + index.add( + new MapBasedInputRow( + new DateTime().minus(1).getMillis(), + Lists.newArrayList("billy", "joe"), + ImmutableMap.of("billy", "A", "joe", "B") + ) + ); + index.add( + new MapBasedInputRow( + new DateTime().minus(1).getMillis(), + Lists.newArrayList("billy", "joe"), + ImmutableMap.of("billy", "A", "joe", "B") + ) + ); + } +}