diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/input/DruidSegmentInputFormatTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/input/DruidSegmentInputFormatTest.java index 910371a5669..e4faea1c069 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/input/DruidSegmentInputFormatTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/input/DruidSegmentInputFormatTest.java @@ -19,6 +19,7 @@ package org.apache.druid.indexing.input; +import com.google.common.collect.ImmutableList; import org.apache.druid.data.input.ColumnsFilter; import org.apache.druid.data.input.InputEntityReader; import org.apache.druid.data.input.InputRowSchema; @@ -50,7 +51,7 @@ public class DruidSegmentInputFormatTest DruidSegmentInputFormat format = new DruidSegmentInputFormat(null, null); InputEntityReader reader = format.createReader( INPUT_ROW_SCHEMA, - DruidSegmentReaderTest.makeInputEntity(Intervals.of("2000/P1D"), null), + DruidSegmentReaderTest.makeInputEntity(Intervals.of("2000/P1D"), null, ImmutableList.of("s", "d"), ImmutableList.of("cnt", "met_s")), null ); Assert.assertTrue(reader instanceof DruidSegmentReader); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/input/DruidSegmentReaderTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/input/DruidSegmentReaderTest.java index 1638e79e9a7..ff4b50cee48 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/input/DruidSegmentReaderTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/input/DruidSegmentReaderTest.java @@ -31,6 +31,7 @@ import org.apache.druid.data.input.MapBasedInputRow; import org.apache.druid.data.input.impl.DimensionsSpec; import org.apache.druid.data.input.impl.DoubleDimensionSchema; import org.apache.druid.data.input.impl.FileEntity; +import org.apache.druid.data.input.impl.LongDimensionSchema; import org.apache.druid.data.input.impl.StringDimensionSchema; import org.apache.druid.data.input.impl.TimestampSpec; import org.apache.druid.hll.HyperLogLogCollector; @@ -41,8 +42,11 @@ import org.apache.druid.java.util.common.guava.BaseSequence; import org.apache.druid.java.util.common.guava.BaseSequence.IteratorMaker; import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.java.util.common.parsers.CloseableIterator; +import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.CountAggregatorFactory; import org.apache.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory; +import org.apache.druid.query.filter.NotDimFilter; +import org.apache.druid.query.filter.OrDimFilter; import org.apache.druid.query.filter.SelectorDimFilter; import org.apache.druid.segment.IndexBuilder; import org.apache.druid.segment.IndexIO; @@ -80,67 +84,44 @@ public class DruidSegmentReaderTest extends NullHandlingTest private File segmentDirectory; private final IndexIO indexIO = TestHelper.getTestIndexIO(); + private DimensionsSpec dimensionsSpec; + private List metrics; + private List rows; @Before public void setUp() throws IOException { // Write a segment with two rows in it, with columns: s (string), d (double), cnt (long), met_s (complex). - final IncrementalIndex incrementalIndex = - IndexBuilder.create() - .schema( - new IncrementalIndexSchema.Builder() - .withDimensionsSpec( - new DimensionsSpec( - ImmutableList.of( - StringDimensionSchema.create("s"), - new DoubleDimensionSchema("d") - ) - ) - ) - .withMetrics( - new CountAggregatorFactory("cnt"), - new HyperUniquesAggregatorFactory("met_s", "s") - ) - .withRollup(false) - .build() - ) - .rows( - ImmutableList.of( - new MapBasedInputRow( - DateTimes.of("2000"), - ImmutableList.of("s", "d"), - ImmutableMap.builder() - .put("s", "foo") - .put("d", 1.23) - .build() - ), - new MapBasedInputRow( - DateTimes.of("2000T01"), - ImmutableList.of("s", "d"), - ImmutableMap.builder() - .put("s", "bar") - .put("d", 4.56) - .build() - ) - ) - ) - .buildIncrementalIndex(); + dimensionsSpec = new DimensionsSpec( + ImmutableList.of( + StringDimensionSchema.create("strCol"), + new DoubleDimensionSchema("dblCol") + ) + ); + metrics = ImmutableList.of( + new CountAggregatorFactory("cnt"), + new HyperUniquesAggregatorFactory("met_s", "strCol") + ); + rows = ImmutableList.of( + new MapBasedInputRow( + DateTimes.of("2000"), + ImmutableList.of("strCol", "dblCol"), + ImmutableMap.builder() + .put("strCol", "foo") + .put("dblCol", 1.23) + .build() + ), + new MapBasedInputRow( + DateTimes.of("2000T01"), + ImmutableList.of("strCol", "dblCol"), + ImmutableMap.builder() + .put("strCol", "bar") + .put("dblCol", 4.56) + .build() + ) + ); - segmentDirectory = temporaryFolder.newFolder(); - - try { - TestHelper.getTestIndexMergerV9( - OnHeapMemorySegmentWriteOutMediumFactory.instance() - ).persist( - incrementalIndex, - segmentDirectory, - new IndexSpec(), - null - ); - } - finally { - incrementalIndex.close(); - } + createTestSetup(); } @Test @@ -152,8 +133,8 @@ public class DruidSegmentReaderTest extends NullHandlingTest new TimestampSpec("__time", "millis", DateTimes.of("1971")), new DimensionsSpec( ImmutableList.of( - StringDimensionSchema.create("s"), - new DoubleDimensionSchema("d") + StringDimensionSchema.create("strCol"), + new DoubleDimensionSchema("dblCol") ) ), ColumnsFilter.all(), @@ -165,22 +146,22 @@ public class DruidSegmentReaderTest extends NullHandlingTest ImmutableList.of( new MapBasedInputRow( DateTimes.of("2000"), - ImmutableList.of("s", "d"), + ImmutableList.of("strCol", "dblCol"), ImmutableMap.builder() .put("__time", DateTimes.of("2000T").getMillis()) - .put("s", "foo") - .put("d", 1.23d) + .put("strCol", "foo") + .put("dblCol", 1.23d) .put("cnt", 1L) .put("met_s", makeHLLC("foo")) .build() ), new MapBasedInputRow( DateTimes.of("2000T01"), - ImmutableList.of("s", "d"), + ImmutableList.of("strCol", "dblCol"), ImmutableMap.builder() .put("__time", DateTimes.of("2000T01").getMillis()) - .put("s", "bar") - .put("d", 4.56d) + .put("strCol", "bar") + .put("dblCol", 4.56d) .put("cnt", 1L) .put("met_s", makeHLLC("bar")) .build() @@ -190,6 +171,74 @@ public class DruidSegmentReaderTest extends NullHandlingTest ); } + @Test + public void testReaderWhenFilteringOnLongColumn() throws IOException + { + dimensionsSpec = new DimensionsSpec( + ImmutableList.of( + new LongDimensionSchema("longCol"), + StringDimensionSchema.create("a"), + StringDimensionSchema.create("b") + ) + ); + metrics = ImmutableList.of(); + + List columnNames = ImmutableList.of("longCol", "a", "b"); + rows = ImmutableList.of( + new MapBasedInputRow( + DateTimes.utc(1667115726217L), + columnNames, + ImmutableMap.builder() + .put("__time", 1667115726217L) + .put("longCol", 0L) + .put("a", "foo1") + .put("b", "bar1") + .build() + ), + new MapBasedInputRow( + DateTimes.utc(1667115726224L), + columnNames, + ImmutableMap.builder() + .put("__time", 1667115726224L) + .put("longCol", 0L) + .put("a", "foo2") + .put("b", "bar2") + .build() + ), + new MapBasedInputRow( + DateTimes.utc(1667115726128L), + columnNames, + ImmutableMap.builder() + .put("__time", 1667115726128L) + .put("longCol", 5L) + .put("a", "foo3") + .put("b", "bar3") + .build() + ) + ); + + createTestSetup(); + + final DruidSegmentReader reader = new DruidSegmentReader( + makeInputEntityWithParams(Intervals.of("2022-10-30/2022-10-31"), columnNames, null), + indexIO, + new TimestampSpec("__time", "iso", null), + dimensionsSpec, + ColumnsFilter.all(), + new OrDimFilter( + new SelectorDimFilter("longCol", "5", null), + new NotDimFilter(new SelectorDimFilter("a", "foo1", null)), + new NotDimFilter(new SelectorDimFilter("b", "bar1", null)) + ), + temporaryFolder.newFolder() + ); + + List expectedRows = new ArrayList<>(); + expectedRows.add(rows.get(2)); + expectedRows.add(rows.get(1)); + Assert.assertEquals(expectedRows, readRows(reader)); + } + @Test public void testDruidTombstoneSegmentReader() throws IOException { @@ -235,8 +284,8 @@ public class DruidSegmentReaderTest extends NullHandlingTest new TimestampSpec("__time", "auto", DateTimes.of("1971")), new DimensionsSpec( ImmutableList.of( - StringDimensionSchema.create("s"), - new DoubleDimensionSchema("d") + StringDimensionSchema.create("strCol"), + new DoubleDimensionSchema("dblCol") ) ), ColumnsFilter.all(), @@ -248,22 +297,22 @@ public class DruidSegmentReaderTest extends NullHandlingTest ImmutableList.of( new MapBasedInputRow( DateTimes.of("2000"), - ImmutableList.of("s", "d"), + ImmutableList.of("strCol", "dblCol"), ImmutableMap.builder() .put("__time", DateTimes.of("2000T").getMillis()) - .put("s", "foo") - .put("d", 1.23d) + .put("strCol", "foo") + .put("dblCol", 1.23d) .put("cnt", 1L) .put("met_s", makeHLLC("foo")) .build() ), new MapBasedInputRow( DateTimes.of("2000T01"), - ImmutableList.of("s", "d"), + ImmutableList.of("strCol", "dblCol"), ImmutableMap.builder() .put("__time", DateTimes.of("2000T01").getMillis()) - .put("s", "bar") - .put("d", 4.56d) + .put("strCol", "bar") + .put("dblCol", 4.56d) .put("cnt", 1L) .put("met_s", makeHLLC("bar")) .build() @@ -280,7 +329,7 @@ public class DruidSegmentReaderTest extends NullHandlingTest makeInputEntity(Intervals.of("2000/P1D")), indexIO, new TimestampSpec("__time", "millis", DateTimes.of("1971")), - DimensionsSpec.builder().setDimensionExclusions(ImmutableList.of("__time", "s", "cnt", "met_s")).build(), + DimensionsSpec.builder().setDimensionExclusions(ImmutableList.of("__time", "strCol", "cnt", "met_s")).build(), ColumnsFilter.all(), null, temporaryFolder.newFolder() @@ -290,22 +339,22 @@ public class DruidSegmentReaderTest extends NullHandlingTest ImmutableList.of( new MapBasedInputRow( DateTimes.of("2000"), - ImmutableList.of("d"), + ImmutableList.of("dblCol"), ImmutableMap.builder() .put("__time", DateTimes.of("2000T").getMillis()) - .put("s", "foo") - .put("d", 1.23d) + .put("strCol", "foo") + .put("dblCol", 1.23d) .put("cnt", 1L) .put("met_s", makeHLLC("foo")) .build() ), new MapBasedInputRow( DateTimes.of("2000T01"), - ImmutableList.of("d"), + ImmutableList.of("dblCol"), ImmutableMap.builder() .put("__time", DateTimes.of("2000T01").getMillis()) - .put("s", "bar") - .put("d", 4.56d) + .put("strCol", "bar") + .put("dblCol", 4.56d) .put("cnt", 1L) .put("met_s", makeHLLC("bar")) .build() @@ -324,11 +373,11 @@ public class DruidSegmentReaderTest extends NullHandlingTest new TimestampSpec("__time", "millis", DateTimes.of("1971")), new DimensionsSpec( ImmutableList.of( - StringDimensionSchema.create("s"), - new DoubleDimensionSchema("d") + StringDimensionSchema.create("strCol"), + new DoubleDimensionSchema("dblCol") ) ), - ColumnsFilter.inclusionBased(ImmutableSet.of("__time", "s", "d")), + ColumnsFilter.inclusionBased(ImmutableSet.of("__time", "strCol", "dblCol")), null, temporaryFolder.newFolder() ); @@ -337,20 +386,20 @@ public class DruidSegmentReaderTest extends NullHandlingTest ImmutableList.of( new MapBasedInputRow( DateTimes.of("2000"), - ImmutableList.of("s", "d"), + ImmutableList.of("strCol", "dblCol"), ImmutableMap.builder() .put("__time", DateTimes.of("2000T").getMillis()) - .put("s", "foo") - .put("d", 1.23d) + .put("strCol", "foo") + .put("dblCol", 1.23d) .build() ), new MapBasedInputRow( DateTimes.of("2000T01"), - ImmutableList.of("s", "d"), + ImmutableList.of("strCol", "dblCol"), ImmutableMap.builder() .put("__time", DateTimes.of("2000T01").getMillis()) - .put("s", "bar") - .put("d", 4.56d) + .put("strCol", "bar") + .put("dblCol", 4.56d) .build() ) ), @@ -367,11 +416,11 @@ public class DruidSegmentReaderTest extends NullHandlingTest new TimestampSpec("__time", "millis", DateTimes.of("1971")), new DimensionsSpec( ImmutableList.of( - StringDimensionSchema.create("s"), - new DoubleDimensionSchema("d") + StringDimensionSchema.create("strCol"), + new DoubleDimensionSchema("dblCol") ) ), - ColumnsFilter.inclusionBased(ImmutableSet.of("s", "d")), + ColumnsFilter.inclusionBased(ImmutableSet.of("strCol", "dblCol")), null, temporaryFolder.newFolder() ); @@ -380,18 +429,18 @@ public class DruidSegmentReaderTest extends NullHandlingTest ImmutableList.of( new MapBasedInputRow( DateTimes.of("1971"), - ImmutableList.of("s", "d"), + ImmutableList.of("strCol", "dblCol"), ImmutableMap.builder() - .put("s", "foo") - .put("d", 1.23d) + .put("strCol", "foo") + .put("dblCol", 1.23d) .build() ), new MapBasedInputRow( DateTimes.of("1971"), - ImmutableList.of("s", "d"), + ImmutableList.of("strCol", "dblCol"), ImmutableMap.builder() - .put("s", "bar") - .put("d", 4.56d) + .put("strCol", "bar") + .put("dblCol", 4.56d) .build() ) ), @@ -408,12 +457,12 @@ public class DruidSegmentReaderTest extends NullHandlingTest new TimestampSpec("__time", "millis", DateTimes.of("1971")), new DimensionsSpec( ImmutableList.of( - StringDimensionSchema.create("s"), - new DoubleDimensionSchema("d") + StringDimensionSchema.create("strCol"), + new DoubleDimensionSchema("dblCol") ) ), ColumnsFilter.all(), - new SelectorDimFilter("d", "1.23", null), + new SelectorDimFilter("dblCol", "1.23", null), temporaryFolder.newFolder() ); @@ -421,11 +470,11 @@ public class DruidSegmentReaderTest extends NullHandlingTest ImmutableList.of( new MapBasedInputRow( DateTimes.of("2000"), - ImmutableList.of("s", "d"), + ImmutableList.of("strCol", "dblCol"), ImmutableMap.builder() .put("__time", DateTimes.of("2000T").getMillis()) - .put("s", "foo") - .put("d", 1.23d) + .put("strCol", "foo") + .put("dblCol", 1.23d) .put("cnt", 1L) .put("met_s", makeHLLC("foo")) .build() @@ -441,11 +490,11 @@ public class DruidSegmentReaderTest extends NullHandlingTest final DruidSegmentReader reader = new DruidSegmentReader( makeInputEntity(Intervals.of("2000/P1D")), indexIO, - new TimestampSpec("d", "posix", null), + new TimestampSpec("dblCol", "posix", null), new DimensionsSpec( ImmutableList.of( - StringDimensionSchema.create("s"), - new DoubleDimensionSchema("d") + StringDimensionSchema.create("strCol"), + new DoubleDimensionSchema("dblCol") ) ), ColumnsFilter.all(), @@ -457,22 +506,22 @@ public class DruidSegmentReaderTest extends NullHandlingTest ImmutableList.of( new MapBasedInputRow( DateTimes.of("1970-01-01T00:00:01.000Z"), - ImmutableList.of("s", "d"), + ImmutableList.of("strCol", "dblCol"), ImmutableMap.builder() .put("__time", DateTimes.of("2000T").getMillis()) - .put("s", "foo") - .put("d", 1.23d) + .put("strCol", "foo") + .put("dblCol", 1.23d) .put("cnt", 1L) .put("met_s", makeHLLC("foo")) .build() ), new MapBasedInputRow( DateTimes.of("1970-01-01T00:00:04.000Z"), - ImmutableList.of("s", "d"), + ImmutableList.of("strCol", "dblCol"), ImmutableMap.builder() .put("__time", DateTimes.of("2000T01").getMillis()) - .put("s", "bar") - .put("d", 4.56d) + .put("strCol", "bar") + .put("dblCol", 4.56d) .put("cnt", 1L) .put("met_s", makeHLLC("bar")) .build() @@ -491,8 +540,8 @@ public class DruidSegmentReaderTest extends NullHandlingTest new TimestampSpec("__time", "posix", null), new DimensionsSpec( ImmutableList.of( - StringDimensionSchema.create("s"), - new DoubleDimensionSchema("d") + StringDimensionSchema.create("strCol"), + new DoubleDimensionSchema("dblCol") ) ), ColumnsFilter.all(), @@ -504,22 +553,22 @@ public class DruidSegmentReaderTest extends NullHandlingTest ImmutableList.of( new MapBasedInputRow( DateTimes.of("31969-04-01T00:00:00.000Z"), - ImmutableList.of("s", "d"), + ImmutableList.of("strCol", "dblCol"), ImmutableMap.builder() .put("__time", DateTimes.of("2000T").getMillis()) - .put("s", "foo") - .put("d", 1.23d) + .put("strCol", "foo") + .put("dblCol", 1.23d) .put("cnt", 1L) .put("met_s", makeHLLC("foo")) .build() ), new MapBasedInputRow( DateTimes.of("31969-05-12T16:00:00.000Z"), - ImmutableList.of("s", "d"), + ImmutableList.of("strCol", "dblCol"), ImmutableMap.builder() .put("__time", DateTimes.of("2000T01").getMillis()) - .put("s", "bar") - .put("d", 4.56d) + .put("strCol", "bar") + .put("dblCol", 4.56d) .put("cnt", 1L) .put("met_s", makeHLLC("bar")) .build() @@ -538,8 +587,8 @@ public class DruidSegmentReaderTest extends NullHandlingTest new TimestampSpec(null, null, DateTimes.of("1971")), new DimensionsSpec( ImmutableList.of( - StringDimensionSchema.create("s"), - new DoubleDimensionSchema("d") + StringDimensionSchema.create("strCol"), + new DoubleDimensionSchema("dblCol") ) ), ColumnsFilter.all(), @@ -551,22 +600,22 @@ public class DruidSegmentReaderTest extends NullHandlingTest ImmutableList.of( new MapBasedInputRow( DateTimes.of("1971"), - ImmutableList.of("s", "d"), + ImmutableList.of("strCol", "dblCol"), ImmutableMap.builder() .put("__time", DateTimes.of("2000T").getMillis()) - .put("s", "foo") - .put("d", 1.23d) + .put("strCol", "foo") + .put("dblCol", 1.23d) .put("cnt", 1L) .put("met_s", makeHLLC("foo")) .build() ), new MapBasedInputRow( DateTimes.of("1971"), - ImmutableList.of("s", "d"), + ImmutableList.of("strCol", "dblCol"), ImmutableMap.builder() .put("__time", DateTimes.of("2000T01").getMillis()) - .put("s", "bar") - .put("d", 4.56d) + .put("strCol", "bar") + .put("dblCol", 4.56d) .put("cnt", 1L) .put("met_s", makeHLLC("bar")) .build() @@ -623,10 +672,20 @@ public class DruidSegmentReaderTest extends NullHandlingTest private DruidSegmentInputEntity makeInputEntity(final Interval interval) { - return makeInputEntity(interval, segmentDirectory); + return makeInputEntity(interval, segmentDirectory, ImmutableList.of("strCol", "dblCol"), ImmutableList.of("cnt", "met_s")); } - public static DruidSegmentInputEntity makeInputEntity(final Interval interval, final File segmentDirectory) + private DruidSegmentInputEntity makeInputEntityWithParams(final Interval interval, final List dimensions, final List metrics) + { + return makeInputEntity(interval, segmentDirectory, dimensions, metrics); + } + + public static DruidSegmentInputEntity makeInputEntity( + final Interval interval, + final File segmentDirectory, + final List dimensions, + final List metrics + ) { return new DruidSegmentInputEntity( new SegmentCacheManager() @@ -669,9 +728,9 @@ public class DruidSegmentReaderTest extends NullHandlingTest }, DataSegment.builder() .dataSource("ds") - .dimensions(ImmutableList.of("s", "d")) - .metrics(ImmutableList.of("cnt", "met_s")) - .interval(Intervals.of("2000/P1D")) + .dimensions(dimensions) + .metrics(metrics) + .interval(interval) .version("1") .size(0) .build(), @@ -765,4 +824,36 @@ public class DruidSegmentReaderTest extends NullHandlingTest } return collector; } + + private void createTestSetup() throws IOException + { + final IncrementalIndex incrementalIndex = + IndexBuilder.create() + .schema( + new IncrementalIndexSchema.Builder() + .withDimensionsSpec(dimensionsSpec) + .withMetrics(metrics.toArray(new AggregatorFactory[0])) + .withRollup(false) + .build() + ) + .rows(rows) + .buildIncrementalIndex(); + + segmentDirectory = temporaryFolder.newFolder(); + + try { + TestHelper.getTestIndexMergerV9( + OnHeapMemorySegmentWriteOutMediumFactory.instance() + ).persist( + incrementalIndex, + segmentDirectory, + new IndexSpec(), + null + ); + } + finally { + incrementalIndex.close(); + } + } + } diff --git a/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarDoublesSupplier.java b/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarDoublesSupplier.java index 28b3c5d2b6c..98a7ab51f98 100644 --- a/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarDoublesSupplier.java +++ b/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarDoublesSupplier.java @@ -22,6 +22,7 @@ package org.apache.druid.segment.data; import com.google.common.base.Supplier; import org.apache.druid.collections.ResourceHolder; +import javax.annotation.Nullable; import java.nio.ByteBuffer; import java.nio.ByteOrder; import java.nio.DoubleBuffer; @@ -82,10 +83,12 @@ public class BlockLayoutColumnarDoublesSupplier implements Supplier> singleThreadedDoubleBuffers = baseDoubleBuffers.singleThreaded(); int currBufferNum = -1; + @Nullable ResourceHolder holder; /** * doubleBuffer's position must be 0 */ + @Nullable DoubleBuffer doubleBuffer; @Override @@ -180,7 +183,10 @@ public class BlockLayoutColumnarDoublesSupplier implements Supplier> singleThreadedFloatBuffers = baseFloatBuffers.singleThreaded(); int currBufferNum = -1; + @Nullable ResourceHolder holder; /** * floatBuffer's position must be 0 */ + @Nullable FloatBuffer floatBuffer; @Override @@ -180,7 +183,10 @@ public class BlockLayoutColumnarFloatsSupplier implements Supplier final Indexed> singleThreadedLongBuffers = baseLongBuffers.singleThreaded(); int currBufferNum = -1; + @Nullable ResourceHolder holder; + @Nullable ByteBuffer buffer; /** * longBuffer's position must be 0 */ + @Nullable LongBuffer longBuffer; @Override @@ -204,7 +208,11 @@ public class BlockLayoutColumnarLongsSupplier implements Supplier public void close() { if (holder != null) { + currBufferNum = -1; holder.close(); + holder = null; + buffer = null; + longBuffer = null; } }