Fixes reindexing bug with filter on long column (#13386)

* fixes BlockLayoutColumnarLongs close method to nullify internal buffer.

* fixes other BlockLayoutColumnar supplier close methods to nullify internal buffers.

* fix spotbugs
This commit is contained in:
Tejaswini Bandlamudi 2022-11-25 19:22:48 +05:30 committed by GitHub
parent 16385c7101
commit b091b32f21
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 245 additions and 133 deletions

View File

@ -19,6 +19,7 @@
package org.apache.druid.indexing.input; package org.apache.druid.indexing.input;
import com.google.common.collect.ImmutableList;
import org.apache.druid.data.input.ColumnsFilter; import org.apache.druid.data.input.ColumnsFilter;
import org.apache.druid.data.input.InputEntityReader; import org.apache.druid.data.input.InputEntityReader;
import org.apache.druid.data.input.InputRowSchema; import org.apache.druid.data.input.InputRowSchema;
@ -50,7 +51,7 @@ public class DruidSegmentInputFormatTest
DruidSegmentInputFormat format = new DruidSegmentInputFormat(null, null); DruidSegmentInputFormat format = new DruidSegmentInputFormat(null, null);
InputEntityReader reader = format.createReader( InputEntityReader reader = format.createReader(
INPUT_ROW_SCHEMA, INPUT_ROW_SCHEMA,
DruidSegmentReaderTest.makeInputEntity(Intervals.of("2000/P1D"), null), DruidSegmentReaderTest.makeInputEntity(Intervals.of("2000/P1D"), null, ImmutableList.of("s", "d"), ImmutableList.of("cnt", "met_s")),
null null
); );
Assert.assertTrue(reader instanceof DruidSegmentReader); Assert.assertTrue(reader instanceof DruidSegmentReader);

View File

@ -31,6 +31,7 @@ import org.apache.druid.data.input.MapBasedInputRow;
import org.apache.druid.data.input.impl.DimensionsSpec; import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.DoubleDimensionSchema; import org.apache.druid.data.input.impl.DoubleDimensionSchema;
import org.apache.druid.data.input.impl.FileEntity; import org.apache.druid.data.input.impl.FileEntity;
import org.apache.druid.data.input.impl.LongDimensionSchema;
import org.apache.druid.data.input.impl.StringDimensionSchema; import org.apache.druid.data.input.impl.StringDimensionSchema;
import org.apache.druid.data.input.impl.TimestampSpec; import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.hll.HyperLogLogCollector; import org.apache.druid.hll.HyperLogLogCollector;
@ -41,8 +42,11 @@ import org.apache.druid.java.util.common.guava.BaseSequence;
import org.apache.druid.java.util.common.guava.BaseSequence.IteratorMaker; import org.apache.druid.java.util.common.guava.BaseSequence.IteratorMaker;
import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.parsers.CloseableIterator; import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.CountAggregatorFactory; import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory; import org.apache.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
import org.apache.druid.query.filter.NotDimFilter;
import org.apache.druid.query.filter.OrDimFilter;
import org.apache.druid.query.filter.SelectorDimFilter; import org.apache.druid.query.filter.SelectorDimFilter;
import org.apache.druid.segment.IndexBuilder; import org.apache.druid.segment.IndexBuilder;
import org.apache.druid.segment.IndexIO; import org.apache.druid.segment.IndexIO;
@ -80,67 +84,44 @@ public class DruidSegmentReaderTest extends NullHandlingTest
private File segmentDirectory; private File segmentDirectory;
private final IndexIO indexIO = TestHelper.getTestIndexIO(); private final IndexIO indexIO = TestHelper.getTestIndexIO();
private DimensionsSpec dimensionsSpec;
private List<AggregatorFactory> metrics;
private List<InputRow> rows;
@Before @Before
public void setUp() throws IOException public void setUp() throws IOException
{ {
// Write a segment with two rows in it, with columns: s (string), d (double), cnt (long), met_s (complex). // Write a segment with two rows in it, with columns: s (string), d (double), cnt (long), met_s (complex).
final IncrementalIndex incrementalIndex = dimensionsSpec = new DimensionsSpec(
IndexBuilder.create() ImmutableList.of(
.schema( StringDimensionSchema.create("strCol"),
new IncrementalIndexSchema.Builder() new DoubleDimensionSchema("dblCol")
.withDimensionsSpec( )
new DimensionsSpec( );
ImmutableList.of( metrics = ImmutableList.of(
StringDimensionSchema.create("s"), new CountAggregatorFactory("cnt"),
new DoubleDimensionSchema("d") new HyperUniquesAggregatorFactory("met_s", "strCol")
) );
) rows = ImmutableList.of(
) new MapBasedInputRow(
.withMetrics( DateTimes.of("2000"),
new CountAggregatorFactory("cnt"), ImmutableList.of("strCol", "dblCol"),
new HyperUniquesAggregatorFactory("met_s", "s") ImmutableMap.<String, Object>builder()
) .put("strCol", "foo")
.withRollup(false) .put("dblCol", 1.23)
.build() .build()
) ),
.rows( new MapBasedInputRow(
ImmutableList.of( DateTimes.of("2000T01"),
new MapBasedInputRow( ImmutableList.of("strCol", "dblCol"),
DateTimes.of("2000"), ImmutableMap.<String, Object>builder()
ImmutableList.of("s", "d"), .put("strCol", "bar")
ImmutableMap.<String, Object>builder() .put("dblCol", 4.56)
.put("s", "foo") .build()
.put("d", 1.23) )
.build() );
),
new MapBasedInputRow(
DateTimes.of("2000T01"),
ImmutableList.of("s", "d"),
ImmutableMap.<String, Object>builder()
.put("s", "bar")
.put("d", 4.56)
.build()
)
)
)
.buildIncrementalIndex();
segmentDirectory = temporaryFolder.newFolder(); createTestSetup();
try {
TestHelper.getTestIndexMergerV9(
OnHeapMemorySegmentWriteOutMediumFactory.instance()
).persist(
incrementalIndex,
segmentDirectory,
new IndexSpec(),
null
);
}
finally {
incrementalIndex.close();
}
} }
@Test @Test
@ -152,8 +133,8 @@ public class DruidSegmentReaderTest extends NullHandlingTest
new TimestampSpec("__time", "millis", DateTimes.of("1971")), new TimestampSpec("__time", "millis", DateTimes.of("1971")),
new DimensionsSpec( new DimensionsSpec(
ImmutableList.of( ImmutableList.of(
StringDimensionSchema.create("s"), StringDimensionSchema.create("strCol"),
new DoubleDimensionSchema("d") new DoubleDimensionSchema("dblCol")
) )
), ),
ColumnsFilter.all(), ColumnsFilter.all(),
@ -165,22 +146,22 @@ public class DruidSegmentReaderTest extends NullHandlingTest
ImmutableList.of( ImmutableList.of(
new MapBasedInputRow( new MapBasedInputRow(
DateTimes.of("2000"), DateTimes.of("2000"),
ImmutableList.of("s", "d"), ImmutableList.of("strCol", "dblCol"),
ImmutableMap.<String, Object>builder() ImmutableMap.<String, Object>builder()
.put("__time", DateTimes.of("2000T").getMillis()) .put("__time", DateTimes.of("2000T").getMillis())
.put("s", "foo") .put("strCol", "foo")
.put("d", 1.23d) .put("dblCol", 1.23d)
.put("cnt", 1L) .put("cnt", 1L)
.put("met_s", makeHLLC("foo")) .put("met_s", makeHLLC("foo"))
.build() .build()
), ),
new MapBasedInputRow( new MapBasedInputRow(
DateTimes.of("2000T01"), DateTimes.of("2000T01"),
ImmutableList.of("s", "d"), ImmutableList.of("strCol", "dblCol"),
ImmutableMap.<String, Object>builder() ImmutableMap.<String, Object>builder()
.put("__time", DateTimes.of("2000T01").getMillis()) .put("__time", DateTimes.of("2000T01").getMillis())
.put("s", "bar") .put("strCol", "bar")
.put("d", 4.56d) .put("dblCol", 4.56d)
.put("cnt", 1L) .put("cnt", 1L)
.put("met_s", makeHLLC("bar")) .put("met_s", makeHLLC("bar"))
.build() .build()
@ -190,6 +171,74 @@ public class DruidSegmentReaderTest extends NullHandlingTest
); );
} }
@Test
public void testReaderWhenFilteringOnLongColumn() throws IOException
{
dimensionsSpec = new DimensionsSpec(
ImmutableList.of(
new LongDimensionSchema("longCol"),
StringDimensionSchema.create("a"),
StringDimensionSchema.create("b")
)
);
metrics = ImmutableList.of();
List<String> columnNames = ImmutableList.of("longCol", "a", "b");
rows = ImmutableList.of(
new MapBasedInputRow(
DateTimes.utc(1667115726217L),
columnNames,
ImmutableMap.<String, Object>builder()
.put("__time", 1667115726217L)
.put("longCol", 0L)
.put("a", "foo1")
.put("b", "bar1")
.build()
),
new MapBasedInputRow(
DateTimes.utc(1667115726224L),
columnNames,
ImmutableMap.<String, Object>builder()
.put("__time", 1667115726224L)
.put("longCol", 0L)
.put("a", "foo2")
.put("b", "bar2")
.build()
),
new MapBasedInputRow(
DateTimes.utc(1667115726128L),
columnNames,
ImmutableMap.<String, Object>builder()
.put("__time", 1667115726128L)
.put("longCol", 5L)
.put("a", "foo3")
.put("b", "bar3")
.build()
)
);
createTestSetup();
final DruidSegmentReader reader = new DruidSegmentReader(
makeInputEntityWithParams(Intervals.of("2022-10-30/2022-10-31"), columnNames, null),
indexIO,
new TimestampSpec("__time", "iso", null),
dimensionsSpec,
ColumnsFilter.all(),
new OrDimFilter(
new SelectorDimFilter("longCol", "5", null),
new NotDimFilter(new SelectorDimFilter("a", "foo1", null)),
new NotDimFilter(new SelectorDimFilter("b", "bar1", null))
),
temporaryFolder.newFolder()
);
List<InputRow> expectedRows = new ArrayList<>();
expectedRows.add(rows.get(2));
expectedRows.add(rows.get(1));
Assert.assertEquals(expectedRows, readRows(reader));
}
@Test @Test
public void testDruidTombstoneSegmentReader() throws IOException public void testDruidTombstoneSegmentReader() throws IOException
{ {
@ -235,8 +284,8 @@ public class DruidSegmentReaderTest extends NullHandlingTest
new TimestampSpec("__time", "auto", DateTimes.of("1971")), new TimestampSpec("__time", "auto", DateTimes.of("1971")),
new DimensionsSpec( new DimensionsSpec(
ImmutableList.of( ImmutableList.of(
StringDimensionSchema.create("s"), StringDimensionSchema.create("strCol"),
new DoubleDimensionSchema("d") new DoubleDimensionSchema("dblCol")
) )
), ),
ColumnsFilter.all(), ColumnsFilter.all(),
@ -248,22 +297,22 @@ public class DruidSegmentReaderTest extends NullHandlingTest
ImmutableList.of( ImmutableList.of(
new MapBasedInputRow( new MapBasedInputRow(
DateTimes.of("2000"), DateTimes.of("2000"),
ImmutableList.of("s", "d"), ImmutableList.of("strCol", "dblCol"),
ImmutableMap.<String, Object>builder() ImmutableMap.<String, Object>builder()
.put("__time", DateTimes.of("2000T").getMillis()) .put("__time", DateTimes.of("2000T").getMillis())
.put("s", "foo") .put("strCol", "foo")
.put("d", 1.23d) .put("dblCol", 1.23d)
.put("cnt", 1L) .put("cnt", 1L)
.put("met_s", makeHLLC("foo")) .put("met_s", makeHLLC("foo"))
.build() .build()
), ),
new MapBasedInputRow( new MapBasedInputRow(
DateTimes.of("2000T01"), DateTimes.of("2000T01"),
ImmutableList.of("s", "d"), ImmutableList.of("strCol", "dblCol"),
ImmutableMap.<String, Object>builder() ImmutableMap.<String, Object>builder()
.put("__time", DateTimes.of("2000T01").getMillis()) .put("__time", DateTimes.of("2000T01").getMillis())
.put("s", "bar") .put("strCol", "bar")
.put("d", 4.56d) .put("dblCol", 4.56d)
.put("cnt", 1L) .put("cnt", 1L)
.put("met_s", makeHLLC("bar")) .put("met_s", makeHLLC("bar"))
.build() .build()
@ -280,7 +329,7 @@ public class DruidSegmentReaderTest extends NullHandlingTest
makeInputEntity(Intervals.of("2000/P1D")), makeInputEntity(Intervals.of("2000/P1D")),
indexIO, indexIO,
new TimestampSpec("__time", "millis", DateTimes.of("1971")), new TimestampSpec("__time", "millis", DateTimes.of("1971")),
DimensionsSpec.builder().setDimensionExclusions(ImmutableList.of("__time", "s", "cnt", "met_s")).build(), DimensionsSpec.builder().setDimensionExclusions(ImmutableList.of("__time", "strCol", "cnt", "met_s")).build(),
ColumnsFilter.all(), ColumnsFilter.all(),
null, null,
temporaryFolder.newFolder() temporaryFolder.newFolder()
@ -290,22 +339,22 @@ public class DruidSegmentReaderTest extends NullHandlingTest
ImmutableList.of( ImmutableList.of(
new MapBasedInputRow( new MapBasedInputRow(
DateTimes.of("2000"), DateTimes.of("2000"),
ImmutableList.of("d"), ImmutableList.of("dblCol"),
ImmutableMap.<String, Object>builder() ImmutableMap.<String, Object>builder()
.put("__time", DateTimes.of("2000T").getMillis()) .put("__time", DateTimes.of("2000T").getMillis())
.put("s", "foo") .put("strCol", "foo")
.put("d", 1.23d) .put("dblCol", 1.23d)
.put("cnt", 1L) .put("cnt", 1L)
.put("met_s", makeHLLC("foo")) .put("met_s", makeHLLC("foo"))
.build() .build()
), ),
new MapBasedInputRow( new MapBasedInputRow(
DateTimes.of("2000T01"), DateTimes.of("2000T01"),
ImmutableList.of("d"), ImmutableList.of("dblCol"),
ImmutableMap.<String, Object>builder() ImmutableMap.<String, Object>builder()
.put("__time", DateTimes.of("2000T01").getMillis()) .put("__time", DateTimes.of("2000T01").getMillis())
.put("s", "bar") .put("strCol", "bar")
.put("d", 4.56d) .put("dblCol", 4.56d)
.put("cnt", 1L) .put("cnt", 1L)
.put("met_s", makeHLLC("bar")) .put("met_s", makeHLLC("bar"))
.build() .build()
@ -324,11 +373,11 @@ public class DruidSegmentReaderTest extends NullHandlingTest
new TimestampSpec("__time", "millis", DateTimes.of("1971")), new TimestampSpec("__time", "millis", DateTimes.of("1971")),
new DimensionsSpec( new DimensionsSpec(
ImmutableList.of( ImmutableList.of(
StringDimensionSchema.create("s"), StringDimensionSchema.create("strCol"),
new DoubleDimensionSchema("d") new DoubleDimensionSchema("dblCol")
) )
), ),
ColumnsFilter.inclusionBased(ImmutableSet.of("__time", "s", "d")), ColumnsFilter.inclusionBased(ImmutableSet.of("__time", "strCol", "dblCol")),
null, null,
temporaryFolder.newFolder() temporaryFolder.newFolder()
); );
@ -337,20 +386,20 @@ public class DruidSegmentReaderTest extends NullHandlingTest
ImmutableList.of( ImmutableList.of(
new MapBasedInputRow( new MapBasedInputRow(
DateTimes.of("2000"), DateTimes.of("2000"),
ImmutableList.of("s", "d"), ImmutableList.of("strCol", "dblCol"),
ImmutableMap.<String, Object>builder() ImmutableMap.<String, Object>builder()
.put("__time", DateTimes.of("2000T").getMillis()) .put("__time", DateTimes.of("2000T").getMillis())
.put("s", "foo") .put("strCol", "foo")
.put("d", 1.23d) .put("dblCol", 1.23d)
.build() .build()
), ),
new MapBasedInputRow( new MapBasedInputRow(
DateTimes.of("2000T01"), DateTimes.of("2000T01"),
ImmutableList.of("s", "d"), ImmutableList.of("strCol", "dblCol"),
ImmutableMap.<String, Object>builder() ImmutableMap.<String, Object>builder()
.put("__time", DateTimes.of("2000T01").getMillis()) .put("__time", DateTimes.of("2000T01").getMillis())
.put("s", "bar") .put("strCol", "bar")
.put("d", 4.56d) .put("dblCol", 4.56d)
.build() .build()
) )
), ),
@ -367,11 +416,11 @@ public class DruidSegmentReaderTest extends NullHandlingTest
new TimestampSpec("__time", "millis", DateTimes.of("1971")), new TimestampSpec("__time", "millis", DateTimes.of("1971")),
new DimensionsSpec( new DimensionsSpec(
ImmutableList.of( ImmutableList.of(
StringDimensionSchema.create("s"), StringDimensionSchema.create("strCol"),
new DoubleDimensionSchema("d") new DoubleDimensionSchema("dblCol")
) )
), ),
ColumnsFilter.inclusionBased(ImmutableSet.of("s", "d")), ColumnsFilter.inclusionBased(ImmutableSet.of("strCol", "dblCol")),
null, null,
temporaryFolder.newFolder() temporaryFolder.newFolder()
); );
@ -380,18 +429,18 @@ public class DruidSegmentReaderTest extends NullHandlingTest
ImmutableList.of( ImmutableList.of(
new MapBasedInputRow( new MapBasedInputRow(
DateTimes.of("1971"), DateTimes.of("1971"),
ImmutableList.of("s", "d"), ImmutableList.of("strCol", "dblCol"),
ImmutableMap.<String, Object>builder() ImmutableMap.<String, Object>builder()
.put("s", "foo") .put("strCol", "foo")
.put("d", 1.23d) .put("dblCol", 1.23d)
.build() .build()
), ),
new MapBasedInputRow( new MapBasedInputRow(
DateTimes.of("1971"), DateTimes.of("1971"),
ImmutableList.of("s", "d"), ImmutableList.of("strCol", "dblCol"),
ImmutableMap.<String, Object>builder() ImmutableMap.<String, Object>builder()
.put("s", "bar") .put("strCol", "bar")
.put("d", 4.56d) .put("dblCol", 4.56d)
.build() .build()
) )
), ),
@ -408,12 +457,12 @@ public class DruidSegmentReaderTest extends NullHandlingTest
new TimestampSpec("__time", "millis", DateTimes.of("1971")), new TimestampSpec("__time", "millis", DateTimes.of("1971")),
new DimensionsSpec( new DimensionsSpec(
ImmutableList.of( ImmutableList.of(
StringDimensionSchema.create("s"), StringDimensionSchema.create("strCol"),
new DoubleDimensionSchema("d") new DoubleDimensionSchema("dblCol")
) )
), ),
ColumnsFilter.all(), ColumnsFilter.all(),
new SelectorDimFilter("d", "1.23", null), new SelectorDimFilter("dblCol", "1.23", null),
temporaryFolder.newFolder() temporaryFolder.newFolder()
); );
@ -421,11 +470,11 @@ public class DruidSegmentReaderTest extends NullHandlingTest
ImmutableList.of( ImmutableList.of(
new MapBasedInputRow( new MapBasedInputRow(
DateTimes.of("2000"), DateTimes.of("2000"),
ImmutableList.of("s", "d"), ImmutableList.of("strCol", "dblCol"),
ImmutableMap.<String, Object>builder() ImmutableMap.<String, Object>builder()
.put("__time", DateTimes.of("2000T").getMillis()) .put("__time", DateTimes.of("2000T").getMillis())
.put("s", "foo") .put("strCol", "foo")
.put("d", 1.23d) .put("dblCol", 1.23d)
.put("cnt", 1L) .put("cnt", 1L)
.put("met_s", makeHLLC("foo")) .put("met_s", makeHLLC("foo"))
.build() .build()
@ -441,11 +490,11 @@ public class DruidSegmentReaderTest extends NullHandlingTest
final DruidSegmentReader reader = new DruidSegmentReader( final DruidSegmentReader reader = new DruidSegmentReader(
makeInputEntity(Intervals.of("2000/P1D")), makeInputEntity(Intervals.of("2000/P1D")),
indexIO, indexIO,
new TimestampSpec("d", "posix", null), new TimestampSpec("dblCol", "posix", null),
new DimensionsSpec( new DimensionsSpec(
ImmutableList.of( ImmutableList.of(
StringDimensionSchema.create("s"), StringDimensionSchema.create("strCol"),
new DoubleDimensionSchema("d") new DoubleDimensionSchema("dblCol")
) )
), ),
ColumnsFilter.all(), ColumnsFilter.all(),
@ -457,22 +506,22 @@ public class DruidSegmentReaderTest extends NullHandlingTest
ImmutableList.of( ImmutableList.of(
new MapBasedInputRow( new MapBasedInputRow(
DateTimes.of("1970-01-01T00:00:01.000Z"), DateTimes.of("1970-01-01T00:00:01.000Z"),
ImmutableList.of("s", "d"), ImmutableList.of("strCol", "dblCol"),
ImmutableMap.<String, Object>builder() ImmutableMap.<String, Object>builder()
.put("__time", DateTimes.of("2000T").getMillis()) .put("__time", DateTimes.of("2000T").getMillis())
.put("s", "foo") .put("strCol", "foo")
.put("d", 1.23d) .put("dblCol", 1.23d)
.put("cnt", 1L) .put("cnt", 1L)
.put("met_s", makeHLLC("foo")) .put("met_s", makeHLLC("foo"))
.build() .build()
), ),
new MapBasedInputRow( new MapBasedInputRow(
DateTimes.of("1970-01-01T00:00:04.000Z"), DateTimes.of("1970-01-01T00:00:04.000Z"),
ImmutableList.of("s", "d"), ImmutableList.of("strCol", "dblCol"),
ImmutableMap.<String, Object>builder() ImmutableMap.<String, Object>builder()
.put("__time", DateTimes.of("2000T01").getMillis()) .put("__time", DateTimes.of("2000T01").getMillis())
.put("s", "bar") .put("strCol", "bar")
.put("d", 4.56d) .put("dblCol", 4.56d)
.put("cnt", 1L) .put("cnt", 1L)
.put("met_s", makeHLLC("bar")) .put("met_s", makeHLLC("bar"))
.build() .build()
@ -491,8 +540,8 @@ public class DruidSegmentReaderTest extends NullHandlingTest
new TimestampSpec("__time", "posix", null), new TimestampSpec("__time", "posix", null),
new DimensionsSpec( new DimensionsSpec(
ImmutableList.of( ImmutableList.of(
StringDimensionSchema.create("s"), StringDimensionSchema.create("strCol"),
new DoubleDimensionSchema("d") new DoubleDimensionSchema("dblCol")
) )
), ),
ColumnsFilter.all(), ColumnsFilter.all(),
@ -504,22 +553,22 @@ public class DruidSegmentReaderTest extends NullHandlingTest
ImmutableList.of( ImmutableList.of(
new MapBasedInputRow( new MapBasedInputRow(
DateTimes.of("31969-04-01T00:00:00.000Z"), DateTimes.of("31969-04-01T00:00:00.000Z"),
ImmutableList.of("s", "d"), ImmutableList.of("strCol", "dblCol"),
ImmutableMap.<String, Object>builder() ImmutableMap.<String, Object>builder()
.put("__time", DateTimes.of("2000T").getMillis()) .put("__time", DateTimes.of("2000T").getMillis())
.put("s", "foo") .put("strCol", "foo")
.put("d", 1.23d) .put("dblCol", 1.23d)
.put("cnt", 1L) .put("cnt", 1L)
.put("met_s", makeHLLC("foo")) .put("met_s", makeHLLC("foo"))
.build() .build()
), ),
new MapBasedInputRow( new MapBasedInputRow(
DateTimes.of("31969-05-12T16:00:00.000Z"), DateTimes.of("31969-05-12T16:00:00.000Z"),
ImmutableList.of("s", "d"), ImmutableList.of("strCol", "dblCol"),
ImmutableMap.<String, Object>builder() ImmutableMap.<String, Object>builder()
.put("__time", DateTimes.of("2000T01").getMillis()) .put("__time", DateTimes.of("2000T01").getMillis())
.put("s", "bar") .put("strCol", "bar")
.put("d", 4.56d) .put("dblCol", 4.56d)
.put("cnt", 1L) .put("cnt", 1L)
.put("met_s", makeHLLC("bar")) .put("met_s", makeHLLC("bar"))
.build() .build()
@ -538,8 +587,8 @@ public class DruidSegmentReaderTest extends NullHandlingTest
new TimestampSpec(null, null, DateTimes.of("1971")), new TimestampSpec(null, null, DateTimes.of("1971")),
new DimensionsSpec( new DimensionsSpec(
ImmutableList.of( ImmutableList.of(
StringDimensionSchema.create("s"), StringDimensionSchema.create("strCol"),
new DoubleDimensionSchema("d") new DoubleDimensionSchema("dblCol")
) )
), ),
ColumnsFilter.all(), ColumnsFilter.all(),
@ -551,22 +600,22 @@ public class DruidSegmentReaderTest extends NullHandlingTest
ImmutableList.of( ImmutableList.of(
new MapBasedInputRow( new MapBasedInputRow(
DateTimes.of("1971"), DateTimes.of("1971"),
ImmutableList.of("s", "d"), ImmutableList.of("strCol", "dblCol"),
ImmutableMap.<String, Object>builder() ImmutableMap.<String, Object>builder()
.put("__time", DateTimes.of("2000T").getMillis()) .put("__time", DateTimes.of("2000T").getMillis())
.put("s", "foo") .put("strCol", "foo")
.put("d", 1.23d) .put("dblCol", 1.23d)
.put("cnt", 1L) .put("cnt", 1L)
.put("met_s", makeHLLC("foo")) .put("met_s", makeHLLC("foo"))
.build() .build()
), ),
new MapBasedInputRow( new MapBasedInputRow(
DateTimes.of("1971"), DateTimes.of("1971"),
ImmutableList.of("s", "d"), ImmutableList.of("strCol", "dblCol"),
ImmutableMap.<String, Object>builder() ImmutableMap.<String, Object>builder()
.put("__time", DateTimes.of("2000T01").getMillis()) .put("__time", DateTimes.of("2000T01").getMillis())
.put("s", "bar") .put("strCol", "bar")
.put("d", 4.56d) .put("dblCol", 4.56d)
.put("cnt", 1L) .put("cnt", 1L)
.put("met_s", makeHLLC("bar")) .put("met_s", makeHLLC("bar"))
.build() .build()
@ -623,10 +672,20 @@ public class DruidSegmentReaderTest extends NullHandlingTest
private DruidSegmentInputEntity makeInputEntity(final Interval interval) private DruidSegmentInputEntity makeInputEntity(final Interval interval)
{ {
return makeInputEntity(interval, segmentDirectory); return makeInputEntity(interval, segmentDirectory, ImmutableList.of("strCol", "dblCol"), ImmutableList.of("cnt", "met_s"));
} }
public static DruidSegmentInputEntity makeInputEntity(final Interval interval, final File segmentDirectory) private DruidSegmentInputEntity makeInputEntityWithParams(final Interval interval, final List<String> dimensions, final List<String> metrics)
{
return makeInputEntity(interval, segmentDirectory, dimensions, metrics);
}
public static DruidSegmentInputEntity makeInputEntity(
final Interval interval,
final File segmentDirectory,
final List<String> dimensions,
final List<String> metrics
)
{ {
return new DruidSegmentInputEntity( return new DruidSegmentInputEntity(
new SegmentCacheManager() new SegmentCacheManager()
@ -669,9 +728,9 @@ public class DruidSegmentReaderTest extends NullHandlingTest
}, },
DataSegment.builder() DataSegment.builder()
.dataSource("ds") .dataSource("ds")
.dimensions(ImmutableList.of("s", "d")) .dimensions(dimensions)
.metrics(ImmutableList.of("cnt", "met_s")) .metrics(metrics)
.interval(Intervals.of("2000/P1D")) .interval(interval)
.version("1") .version("1")
.size(0) .size(0)
.build(), .build(),
@ -765,4 +824,36 @@ public class DruidSegmentReaderTest extends NullHandlingTest
} }
return collector; return collector;
} }
private void createTestSetup() throws IOException
{
final IncrementalIndex incrementalIndex =
IndexBuilder.create()
.schema(
new IncrementalIndexSchema.Builder()
.withDimensionsSpec(dimensionsSpec)
.withMetrics(metrics.toArray(new AggregatorFactory[0]))
.withRollup(false)
.build()
)
.rows(rows)
.buildIncrementalIndex();
segmentDirectory = temporaryFolder.newFolder();
try {
TestHelper.getTestIndexMergerV9(
OnHeapMemorySegmentWriteOutMediumFactory.instance()
).persist(
incrementalIndex,
segmentDirectory,
new IndexSpec(),
null
);
}
finally {
incrementalIndex.close();
}
}
} }

View File

@ -22,6 +22,7 @@ package org.apache.druid.segment.data;
import com.google.common.base.Supplier; import com.google.common.base.Supplier;
import org.apache.druid.collections.ResourceHolder; import org.apache.druid.collections.ResourceHolder;
import javax.annotation.Nullable;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.ByteOrder; import java.nio.ByteOrder;
import java.nio.DoubleBuffer; import java.nio.DoubleBuffer;
@ -82,10 +83,12 @@ public class BlockLayoutColumnarDoublesSupplier implements Supplier<ColumnarDoub
final Indexed<ResourceHolder<ByteBuffer>> singleThreadedDoubleBuffers = baseDoubleBuffers.singleThreaded(); final Indexed<ResourceHolder<ByteBuffer>> singleThreadedDoubleBuffers = baseDoubleBuffers.singleThreaded();
int currBufferNum = -1; int currBufferNum = -1;
@Nullable
ResourceHolder<ByteBuffer> holder; ResourceHolder<ByteBuffer> holder;
/** /**
* doubleBuffer's position must be 0 * doubleBuffer's position must be 0
*/ */
@Nullable
DoubleBuffer doubleBuffer; DoubleBuffer doubleBuffer;
@Override @Override
@ -180,7 +183,10 @@ public class BlockLayoutColumnarDoublesSupplier implements Supplier<ColumnarDoub
public void close() public void close()
{ {
if (holder != null) { if (holder != null) {
currBufferNum = -1;
holder.close(); holder.close();
holder = null;
doubleBuffer = null;
} }
} }

View File

@ -22,6 +22,7 @@ package org.apache.druid.segment.data;
import com.google.common.base.Supplier; import com.google.common.base.Supplier;
import org.apache.druid.collections.ResourceHolder; import org.apache.druid.collections.ResourceHolder;
import javax.annotation.Nullable;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.ByteOrder; import java.nio.ByteOrder;
import java.nio.FloatBuffer; import java.nio.FloatBuffer;
@ -82,10 +83,12 @@ public class BlockLayoutColumnarFloatsSupplier implements Supplier<ColumnarFloat
final Indexed<ResourceHolder<ByteBuffer>> singleThreadedFloatBuffers = baseFloatBuffers.singleThreaded(); final Indexed<ResourceHolder<ByteBuffer>> singleThreadedFloatBuffers = baseFloatBuffers.singleThreaded();
int currBufferNum = -1; int currBufferNum = -1;
@Nullable
ResourceHolder<ByteBuffer> holder; ResourceHolder<ByteBuffer> holder;
/** /**
* floatBuffer's position must be 0 * floatBuffer's position must be 0
*/ */
@Nullable
FloatBuffer floatBuffer; FloatBuffer floatBuffer;
@Override @Override
@ -180,7 +183,10 @@ public class BlockLayoutColumnarFloatsSupplier implements Supplier<ColumnarFloat
public void close() public void close()
{ {
if (holder != null) { if (holder != null) {
currBufferNum = -1;
holder.close(); holder.close();
holder = null;
floatBuffer = null;
} }
} }

View File

@ -22,6 +22,7 @@ package org.apache.druid.segment.data;
import com.google.common.base.Supplier; import com.google.common.base.Supplier;
import org.apache.druid.collections.ResourceHolder; import org.apache.druid.collections.ResourceHolder;
import javax.annotation.Nullable;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.ByteOrder; import java.nio.ByteOrder;
import java.nio.LongBuffer; import java.nio.LongBuffer;
@ -123,11 +124,14 @@ public class BlockLayoutColumnarLongsSupplier implements Supplier<ColumnarLongs>
final Indexed<ResourceHolder<ByteBuffer>> singleThreadedLongBuffers = baseLongBuffers.singleThreaded(); final Indexed<ResourceHolder<ByteBuffer>> singleThreadedLongBuffers = baseLongBuffers.singleThreaded();
int currBufferNum = -1; int currBufferNum = -1;
@Nullable
ResourceHolder<ByteBuffer> holder; ResourceHolder<ByteBuffer> holder;
@Nullable
ByteBuffer buffer; ByteBuffer buffer;
/** /**
* longBuffer's position must be 0 * longBuffer's position must be 0
*/ */
@Nullable
LongBuffer longBuffer; LongBuffer longBuffer;
@Override @Override
@ -204,7 +208,11 @@ public class BlockLayoutColumnarLongsSupplier implements Supplier<ColumnarLongs>
public void close() public void close()
{ {
if (holder != null) { if (holder != null) {
currBufferNum = -1;
holder.close(); holder.close();
holder = null;
buffer = null;
longBuffer = null;
} }
} }