diff --git a/processing/src/main/java/org/apache/druid/query/lookup/LookupSegment.java b/processing/src/main/java/org/apache/druid/query/lookup/LookupSegment.java new file mode 100644 index 00000000000..9e3387da8ac --- /dev/null +++ b/processing/src/main/java/org/apache/druid/query/lookup/LookupSegment.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.lookup; + +import com.google.common.collect.ImmutableMap; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.segment.RowAdapter; +import org.apache.druid.segment.RowBasedSegment; +import org.apache.druid.segment.column.ValueType; +import org.apache.druid.segment.join.lookup.LookupColumnSelectorFactory; +import org.apache.druid.timeline.SegmentId; + +import java.util.Map; +import java.util.function.Function; +import java.util.function.ToLongFunction; + +/** + * A {@link org.apache.druid.segment.Segment} that is based on a {@link LookupExtractor}. Allows direct + * querying of lookups. The lookup must support {@link LookupExtractor#iterable()}. + */ +public class LookupSegment extends RowBasedSegment> +{ + private static final Map ROW_SIGNATURE = ImmutableMap.of( + LookupColumnSelectorFactory.KEY_COLUMN, ValueType.STRING, + LookupColumnSelectorFactory.VALUE_COLUMN, ValueType.STRING + ); + + public LookupSegment(final String lookupName, final LookupExtractorFactory lookupExtractorFactory) + { + super( + SegmentId.dummy(lookupName), + () -> { + final LookupExtractor extractor = lookupExtractorFactory.get(); + + if (!extractor.canIterate()) { + throw new ISE("Cannot iterate lookup[%s]", lookupExtractorFactory); + } + + return extractor.iterable().iterator(); + }, + new RowAdapter>() + { + @Override + public ToLongFunction> timestampFunction() + { + // No timestamps for lookups. + return row -> 0L; + } + + @Override + public Function, Object> columnFunction(String columnName) + { + if (LookupColumnSelectorFactory.KEY_COLUMN.equals(columnName)) { + return Map.Entry::getKey; + } else if (LookupColumnSelectorFactory.VALUE_COLUMN.equals(columnName)) { + return Map.Entry::getValue; + } else { + return row -> null; + } + } + }, + ROW_SIGNATURE + ); + } +} diff --git a/processing/src/main/java/org/apache/druid/segment/RowBasedColumnSelectorFactory.java b/processing/src/main/java/org/apache/druid/segment/RowBasedColumnSelectorFactory.java index 7f338e64e7a..32936b64942 100644 --- a/processing/src/main/java/org/apache/druid/segment/RowBasedColumnSelectorFactory.java +++ b/processing/src/main/java/org/apache/druid/segment/RowBasedColumnSelectorFactory.java @@ -87,20 +87,31 @@ public class RowBasedColumnSelectorFactory implements ColumnSelectorFactory } @Nullable - public static ColumnCapabilities getColumnCapabilities( + static ColumnCapabilities getColumnCapabilities( final Map rowSignature, final String columnName ) { if (ColumnHolder.TIME_COLUMN_NAME.equals(columnName)) { // TIME_COLUMN_NAME is handled specially; override the provided rowSignature. - return new ColumnCapabilitiesImpl().setType(ValueType.LONG); + return new ColumnCapabilitiesImpl().setType(ValueType.LONG).setIsComplete(true); } else { final ValueType valueType = rowSignature.get(columnName); // Do _not_ set isDictionaryEncoded or hasBitmapIndexes, because Row-based columns do not have those things. // Do set hasMultipleValues, because we might return multiple values. - return valueType != null ? new ColumnCapabilitiesImpl().setType(valueType).setHasMultipleValues(true) : null; + if (valueType != null) { + return new ColumnCapabilitiesImpl() + .setType(valueType) + + // Non-numeric types might have multiple values + .setHasMultipleValues(!valueType.isNumeric()) + + // Numeric types should be reported as complete, but not STRING or COMPLEX (because we don't have full info) + .setIsComplete(valueType.isNumeric()); + } else { + return null; + } } } diff --git a/processing/src/main/java/org/apache/druid/segment/RowBasedCursor.java b/processing/src/main/java/org/apache/druid/segment/RowBasedCursor.java new file mode 100644 index 00000000000..a83759dac47 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/segment/RowBasedCursor.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment; + +import org.apache.druid.java.util.common.granularity.Granularity; +import org.apache.druid.query.BaseQuery; +import org.apache.druid.query.filter.Filter; +import org.apache.druid.query.filter.ValueMatcher; +import org.apache.druid.segment.column.ValueType; +import org.apache.druid.segment.filter.BooleanValueMatcher; +import org.joda.time.DateTime; +import org.joda.time.Interval; + +import javax.annotation.Nullable; +import java.util.Map; +import java.util.function.ToLongFunction; + +/** + * A {@link Cursor} that is based on a stream of objects. Generally created by a {@link RowBasedStorageAdapter}. + * + * @see RowBasedSegment#RowBasedSegment for implementation notes + */ +public class RowBasedCursor implements Cursor +{ + private final RowWalker rowWalker; + private final ToLongFunction timestampFunction; + private final Interval interval; + private final boolean descending; + private final DateTime cursorTime; + private final ColumnSelectorFactory columnSelectorFactory; + private final ValueMatcher valueMatcher; + + RowBasedCursor( + final RowWalker rowWalker, + final RowAdapter rowAdapter, + @Nullable final Filter filter, + final Interval interval, + final VirtualColumns virtualColumns, + final Granularity gran, + final boolean descending, + final Map rowSignature + ) + { + this.rowWalker = rowWalker; + this.timestampFunction = rowAdapter.timestampFunction(); + this.interval = interval; + this.descending = descending; + this.cursorTime = gran.toDateTime(interval.getStartMillis()); + this.columnSelectorFactory = virtualColumns.wrap( + RowBasedColumnSelectorFactory.create( + rowAdapter, + rowWalker::currentRow, + rowSignature, + false + ) + ); + + if (filter == null) { + this.valueMatcher = BooleanValueMatcher.of(true); + } else { + this.valueMatcher = filter.makeMatcher(this.columnSelectorFactory); + } + + rowWalker.skipToDateTime(descending ? interval.getEnd().minus(1) : interval.getStart(), descending); + advanceToMatchingRow(); + } + + @Override + public ColumnSelectorFactory getColumnSelectorFactory() + { + return columnSelectorFactory; + } + + @Override + public DateTime getTime() + { + return cursorTime; + } + + @Override + public void advance() + { + advanceUninterruptibly(); + BaseQuery.checkInterrupted(); + } + + @Override + public void advanceUninterruptibly() + { + rowWalker.advance(); + advanceToMatchingRow(); + } + + @Override + public boolean isDone() + { + return rowWalker.isDone() || !interval.contains(timestampFunction.applyAsLong(rowWalker.currentRow())); + } + + @Override + public boolean isDoneOrInterrupted() + { + return isDone() || Thread.currentThread().isInterrupted(); + } + + @Override + public void reset() + { + rowWalker.reset(); + rowWalker.skipToDateTime(descending ? interval.getEnd().minus(1) : interval.getStart(), descending); + advanceToMatchingRow(); + } + + private void advanceToMatchingRow() + { + while (!isDone() && !valueMatcher.matches()) { + rowWalker.advance(); + } + } +} diff --git a/processing/src/main/java/org/apache/druid/segment/RowBasedSegment.java b/processing/src/main/java/org/apache/druid/segment/RowBasedSegment.java new file mode 100644 index 00000000000..58e55e0ab55 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/segment/RowBasedSegment.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment; + +import com.google.common.base.Preconditions; +import org.apache.druid.segment.column.ValueType; +import org.apache.druid.timeline.SegmentId; +import org.joda.time.Interval; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import java.util.Map; + +/** + * A {@link Segment} that is based on a stream of objects. + */ +public class RowBasedSegment extends AbstractSegment +{ + private final SegmentId segmentId; + private final StorageAdapter storageAdapter; + + /** + * Create a row-based segment. + * + * The provided "rowIterable" must be in time-order according to the provided {@link RowAdapter#timestampFunction()}. + * The cursor returned by {@link RowBasedStorageAdapter#makeCursors} makes no attempt to verify this, and callers + * will expect it. + * + * The provided "rowSignature" will be used for reporting available columns and their capabilities to users of + * {@link #asStorageAdapter()}. Note that the {@link ColumnSelectorFactory} implementation returned by this segment's + * storage adapter will allow creation of selectors on any field, using the {@link RowAdapter#columnFunction} for that + * field, even if it doesn't appear in "rowSignature". + * + * @param segmentId segment identifier; will be returned by {@link #getId()} + * @param rowIterable objects that comprise this segment + * @param rowAdapter adapter used for reading these objects + * @param rowSignature signature of the columns in these objects + */ + public RowBasedSegment( + final SegmentId segmentId, + final Iterable rowIterable, + final RowAdapter rowAdapter, + final Map rowSignature + ) + { + this.segmentId = Preconditions.checkNotNull(segmentId, "segmentId"); + this.storageAdapter = new RowBasedStorageAdapter<>( + rowIterable, + rowAdapter, + rowSignature + ); + } + + @Override + @Nonnull + public SegmentId getId() + { + return segmentId; + } + + @Override + @Nonnull + public Interval getDataInterval() + { + return storageAdapter.getInterval(); + } + + @Nullable + @Override + public QueryableIndex asQueryableIndex() + { + return null; + } + + @Override + @Nonnull + public StorageAdapter asStorageAdapter() + { + return storageAdapter; + } + + @Override + public void close() + { + // Do nothing. + } +} diff --git a/processing/src/main/java/org/apache/druid/segment/RowBasedStorageAdapter.java b/processing/src/main/java/org/apache/druid/segment/RowBasedStorageAdapter.java new file mode 100644 index 00000000000..3901cebb280 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/segment/RowBasedStorageAdapter.java @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.java.util.common.granularity.Granularity; +import org.apache.druid.java.util.common.guava.Sequence; +import org.apache.druid.java.util.common.guava.Sequences; +import org.apache.druid.query.QueryMetrics; +import org.apache.druid.query.filter.Filter; +import org.apache.druid.segment.column.ColumnCapabilities; +import org.apache.druid.segment.column.ValueType; +import org.apache.druid.segment.data.Indexed; +import org.apache.druid.segment.data.ListIndexed; +import org.joda.time.DateTime; +import org.joda.time.Interval; + +import javax.annotation.Nullable; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +/** + * A {@link StorageAdapter} that is based on a stream of objects. Generally created by a {@link RowBasedSegment}. + * + * @see RowBasedSegment#RowBasedSegment for implementation notes + */ +public class RowBasedStorageAdapter implements StorageAdapter +{ + private final Iterable rowIterable; + private final RowAdapter rowAdapter; + private final Map rowSignature; + + RowBasedStorageAdapter( + final Iterable rowIterable, + final RowAdapter rowAdapter, + final Map rowSignature + ) + { + this.rowIterable = Preconditions.checkNotNull(rowIterable, "rowIterable"); + this.rowAdapter = Preconditions.checkNotNull(rowAdapter, "rowAdapter"); + this.rowSignature = Preconditions.checkNotNull(rowSignature, "rowSignature"); + } + + @Override + public Interval getInterval() + { + return Intervals.ETERNITY; + } + + @Override + public Indexed getAvailableDimensions() + { + return new ListIndexed<>(new ArrayList<>(rowSignature.keySet())); + } + + @Override + public Iterable getAvailableMetrics() + { + return Collections.emptyList(); + } + + @Override + public int getDimensionCardinality(String column) + { + return Integer.MAX_VALUE; + } + + @Override + public DateTime getMinTime() + { + return getInterval().getStart(); + } + + @Override + public DateTime getMaxTime() + { + return getInterval().getEnd().minus(1); + } + + @Nullable + @Override + public Comparable getMinValue(String column) + { + return null; + } + + @Nullable + @Override + public Comparable getMaxValue(String column) + { + return null; + } + + @Override + public Capabilities getCapabilities() + { + return Capabilities.builder().dimensionValuesSorted(false).build(); + } + + @Nullable + @Override + public ColumnCapabilities getColumnCapabilities(String column) + { + return RowBasedColumnSelectorFactory.getColumnCapabilities(rowSignature, column); + } + + @Nullable + @Override + public String getColumnTypeName(String column) + { + final ColumnCapabilities columnCapabilities = getColumnCapabilities(column); + return columnCapabilities != null ? columnCapabilities.getType().toString() : null; + } + + @Override + public int getNumRows() + { + throw new UnsupportedOperationException("Cannot retrieve number of rows"); + } + + @Override + public DateTime getMaxIngestedEventTime() + { + return getMaxTime(); + } + + @Override + public Metadata getMetadata() + { + throw new UnsupportedOperationException("Cannot retrieve metadata"); + } + + @Override + public Sequence makeCursors( + @Nullable final Filter filter, + final Interval queryInterval, + final VirtualColumns virtualColumns, + final Granularity gran, + final boolean descending, + @Nullable final QueryMetrics queryMetrics + ) + { + final Interval actualInterval = queryInterval.overlap(new Interval(getMinTime(), gran.bucketEnd(getMaxTime()))); + + if (actualInterval == null) { + return Sequences.empty(); + } + + final RowWalker rowWalker = new RowWalker<>( + descending ? reverse(rowIterable) : rowIterable, + rowAdapter + ); + + final Iterable bucketIntervals = gran.getIterable(actualInterval); + + return Sequences.simple( + Iterables.transform( + descending ? reverse(bucketIntervals) : bucketIntervals, + bucketInterval -> + new RowBasedCursor<>( + rowWalker, + rowAdapter, + filter, + bucketInterval, + virtualColumns, + gran, + descending, + rowSignature + ) + ) + ); + } + + /** + * Reverse an Iterable. Will avoid materialization if possible, but, this is not always possible. + */ + private static Iterable reverse(final Iterable iterable) + { + if (iterable instanceof List) { + //noinspection unchecked, rawtypes + return Lists.reverse((List) iterable); + } else { + // Materialize and reverse the objects. Note that this means reversing non-List Iterables will use extra memory. + return Lists.reverse(Lists.newArrayList(iterable)); + } + } +} diff --git a/processing/src/main/java/org/apache/druid/segment/RowWalker.java b/processing/src/main/java/org/apache/druid/segment/RowWalker.java new file mode 100644 index 00000000000..b44802481b9 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/segment/RowWalker.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment; + +import com.google.common.base.Preconditions; +import org.joda.time.DateTime; + +import javax.annotation.Nullable; +import java.util.Iterator; +import java.util.function.ToLongFunction; + +/** + * Used by {@link RowBasedStorageAdapter} and {@link RowBasedCursor} to walk through rows. It allows multiple + * {@link RowBasedCursor} to share the same underlying Iterable. + */ +public class RowWalker +{ + private final Iterable rowIterable; + private final ToLongFunction timestampFunction; + + private Iterator rowIterator; + + @Nullable + private T current = null; + + RowWalker(final Iterable rowIterable, final RowAdapter rowAdapter) + { + this.rowIterable = rowIterable; + this.timestampFunction = rowAdapter.timestampFunction(); + + reset(); + } + + public boolean isDone() + { + return current == null; + } + + public T currentRow() + { + return Preconditions.checkNotNull(current, "cannot call currentRow when isDone == true"); + } + + public void advance() + { + if (rowIterator.hasNext()) { + current = rowIterator.next(); + + if (current == null) { + throw new NullPointerException("null row encountered in walker"); + } + } else { + current = null; + } + } + + public void reset() + { + rowIterator = rowIterable.iterator(); + advance(); + } + + public void skipToDateTime(final DateTime timestamp, final boolean descending) + { + while (current != null && (descending + ? timestamp.isBefore(timestampFunction.applyAsLong(current)) + : timestamp.isAfter(timestampFunction.applyAsLong(current)))) { + advance(); + } + } +} diff --git a/processing/src/test/java/org/apache/druid/query/lookup/LookupSegmentTest.java b/processing/src/test/java/org/apache/druid/query/lookup/LookupSegmentTest.java new file mode 100644 index 00000000000..d6b46d3d913 --- /dev/null +++ b/processing/src/test/java/org/apache/druid/query/lookup/LookupSegmentTest.java @@ -0,0 +1,217 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.lookup; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSortedMap; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.java.util.common.Pair; +import org.apache.druid.java.util.common.granularity.Granularities; +import org.apache.druid.java.util.common.guava.Sequence; +import org.apache.druid.query.extraction.MapLookupExtractor; +import org.apache.druid.segment.ColumnValueSelector; +import org.apache.druid.segment.Cursor; +import org.apache.druid.segment.RowBasedStorageAdapter; +import org.apache.druid.segment.VirtualColumns; +import org.apache.druid.segment.column.ColumnCapabilities; +import org.apache.druid.segment.column.ValueType; +import org.apache.druid.timeline.SegmentId; +import org.hamcrest.CoreMatchers; +import org.junit.Assert; +import org.junit.Test; + +import javax.annotation.Nullable; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +public class LookupSegmentTest +{ + public static final String LOOKUP_NAME = "mylookup"; + + public static final Map LOOKUP_MAP = + ImmutableSortedMap.of( + "a", "b", + "x", "y" + ); + + public static final LookupExtractorFactory LOOKUP_EXTRACTOR_FACTORY = + new LookupExtractorFactory() + { + @Override + public boolean start() + { + return true; + } + + @Override + public boolean close() + { + return true; + } + + @Override + public boolean replaces(@Nullable LookupExtractorFactory other) + { + return true; + } + + @Nullable + @Override + public LookupIntrospectHandler getIntrospectHandler() + { + throw new UnsupportedOperationException("not needed for this test"); + } + + @Override + public LookupExtractor get() + { + return new MapLookupExtractor(LOOKUP_MAP, false); + } + }; + + private static final LookupSegment LOOKUP_SEGMENT = new LookupSegment(LOOKUP_NAME, LOOKUP_EXTRACTOR_FACTORY); + + @Test + public void test_getId() + { + Assert.assertEquals(SegmentId.dummy(LOOKUP_NAME), LOOKUP_SEGMENT.getId()); + } + + @Test + public void test_getDataInterval() + { + Assert.assertEquals(Intervals.ETERNITY, LOOKUP_SEGMENT.getDataInterval()); + } + + @Test + public void test_asQueryableIndex() + { + Assert.assertNull(LOOKUP_SEGMENT.asQueryableIndex()); + } + + @Test + public void test_asStorageAdapter_getAvailableDimensions() + { + Assert.assertEquals( + ImmutableList.of("k", "v"), + Lists.newArrayList(LOOKUP_SEGMENT.asStorageAdapter().getAvailableDimensions().iterator()) + ); + } + + @Test + public void test_asStorageAdapter_getAvailableMetrics() + { + Assert.assertEquals( + ImmutableList.of(), + Lists.newArrayList(LOOKUP_SEGMENT.asStorageAdapter().getAvailableMetrics()) + ); + } + + @Test + public void test_asStorageAdapter_getColumnCapabilitiesK() + { + final ColumnCapabilities capabilities = LOOKUP_SEGMENT.asStorageAdapter().getColumnCapabilities("k"); + + Assert.assertEquals(ValueType.STRING, capabilities.getType()); + + // Note: the "k" column does not actually have multiple values, but the RowBasedStorageAdapter doesn't allow + // reporting complete single-valued capabilities. It would be good to change this in the future, so query engines + // running on top of lookups can take advantage of singly-valued optimizations. + Assert.assertTrue(capabilities.hasMultipleValues()); + Assert.assertFalse(capabilities.isDictionaryEncoded()); + Assert.assertFalse(capabilities.isComplete()); + } + + @Test + public void test_asStorageAdapter_getColumnCapabilitiesV() + { + final ColumnCapabilities capabilities = LOOKUP_SEGMENT.asStorageAdapter().getColumnCapabilities("v"); + + // Note: the "v" column does not actually have multiple values, but the RowBasedStorageAdapter doesn't allow + // reporting complete single-valued capabilities. It would be good to change this in the future, so query engines + // running on top of lookups can take advantage of singly-valued optimizations. + Assert.assertEquals(ValueType.STRING, capabilities.getType()); + Assert.assertTrue(capabilities.hasMultipleValues()); + Assert.assertFalse(capabilities.isDictionaryEncoded()); + Assert.assertFalse(capabilities.isComplete()); + } + + @Test + public void test_asStorageAdapter_getInterval() + { + Assert.assertEquals(Intervals.ETERNITY, LOOKUP_SEGMENT.asStorageAdapter().getInterval()); + } + + @Test + public void test_asStorageAdapter_getDimensionCardinalityK() + { + Assert.assertEquals(Integer.MAX_VALUE, LOOKUP_SEGMENT.asStorageAdapter().getDimensionCardinality("k")); + } + + @Test + public void test_asStorageAdapter_getDimensionCardinalityV() + { + Assert.assertEquals(Integer.MAX_VALUE, LOOKUP_SEGMENT.asStorageAdapter().getDimensionCardinality("v")); + } + + @Test + public void test_asStorageAdapter_makeCursors() + { + final Sequence cursors = LOOKUP_SEGMENT.asStorageAdapter().makeCursors( + null, + Intervals.of("1970/PT1H"), + VirtualColumns.EMPTY, + Granularities.ALL, + false, + null + ); + + final Cursor cursor = Iterables.getOnlyElement(cursors.toList()); + final List> kvs = new ArrayList<>(); + + + final ColumnValueSelector keySelector = cursor.getColumnSelectorFactory().makeColumnValueSelector("k"); + final ColumnValueSelector valueSelector = cursor.getColumnSelectorFactory().makeColumnValueSelector("v"); + + while (!cursor.isDone()) { + kvs.add(Pair.of(String.valueOf(keySelector.getObject()), String.valueOf(valueSelector.getObject()))); + cursor.advanceUninterruptibly(); + } + + Assert.assertEquals( + ImmutableList.of( + Pair.of("a", "b"), + Pair.of("x", "y") + ), + kvs + ); + } + + @Test + public void test_asStorageAdapter_isRowBasedAdapter() + { + // This allows us to assume that RowBasedStorageAdapterTest is further exercising makeCursors and verifying misc. + // methods like getMinTime, getMaxTime, getMetadata, etc, without checking them explicitly in _this_ test class. + Assert.assertThat(LOOKUP_SEGMENT.asStorageAdapter(), CoreMatchers.instanceOf(RowBasedStorageAdapter.class)); + } +} diff --git a/processing/src/test/java/org/apache/druid/segment/IndexBuilder.java b/processing/src/test/java/org/apache/druid/segment/IndexBuilder.java index 654a71b31dd..629c9fcbc73 100644 --- a/processing/src/test/java/org/apache/druid/segment/IndexBuilder.java +++ b/processing/src/test/java/org/apache/druid/segment/IndexBuilder.java @@ -19,28 +19,33 @@ package org.apache.druid.segment; -import com.google.common.base.Function; import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import org.apache.druid.data.input.InputRow; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.CountAggregatorFactory; +import org.apache.druid.segment.column.ColumnCapabilities; import org.apache.druid.segment.column.ColumnConfig; +import org.apache.druid.segment.column.ValueType; import org.apache.druid.segment.incremental.IncrementalIndex; import org.apache.druid.segment.incremental.IncrementalIndexSchema; import org.apache.druid.segment.incremental.IndexSizeExceededException; import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory; import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory; +import org.apache.druid.timeline.SegmentId; import java.io.File; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.Map; import java.util.UUID; import java.util.concurrent.ThreadLocalRandom; +import java.util.stream.Collectors; /** * Helps tests make segments. @@ -63,7 +68,7 @@ public class IndexBuilder private IndexBuilder() { - + // Callers must use "create". } public static IndexBuilder create() @@ -163,27 +168,13 @@ public class IndexBuilder indexMerger.merge( Lists.transform( persisted, - new Function() - { - @Override - public IndexableAdapter apply(QueryableIndex input) - { - return new QueryableIndexIndexableAdapter(input); - } - } + QueryableIndexIndexableAdapter::new ), true, Iterables.toArray( Iterables.transform( Arrays.asList(schema.getMetrics()), - new Function() - { - @Override - public AggregatorFactory apply(AggregatorFactory input) - { - return input.getCombiningFactory(); - } - } + AggregatorFactory::getCombiningFactory ), AggregatorFactory.class ), @@ -201,6 +192,40 @@ public class IndexBuilder } } + public RowBasedSegment buildRowBasedSegmentWithoutTypeSignature() + { + return new RowBasedSegment<>( + SegmentId.dummy("IndexBuilder"), + rows, + RowAdapters.standardRow(), + ImmutableMap.of() + ); + } + + public RowBasedSegment buildRowBasedSegmentWithTypeSignature() + { + // Determine row signature by building an mmapped index first. + try (final QueryableIndex index = buildMMappedIndex()) { + final Map rowSignature = + index.getColumnNames().stream().collect( + Collectors.toMap( + column -> column, + column -> { + final ColumnCapabilities capabilities = index.getColumnHolder(column).getCapabilities(); + return capabilities.getType(); + } + ) + ); + + return new RowBasedSegment<>( + SegmentId.dummy("IndexBuilder"), + rows, + RowAdapters.standardRow(), + rowSignature + ); + } + } + private static IncrementalIndex buildIncrementalIndexWithRows( IncrementalIndexSchema schema, int maxRows, diff --git a/processing/src/test/java/org/apache/druid/segment/RowBasedStorageAdapterTest.java b/processing/src/test/java/org/apache/druid/segment/RowBasedStorageAdapterTest.java new file mode 100644 index 00000000000..07086c42f4b --- /dev/null +++ b/processing/src/test/java/org/apache/druid/segment/RowBasedStorageAdapterTest.java @@ -0,0 +1,794 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import org.apache.druid.common.config.NullHandling; +import org.apache.druid.common.guava.GuavaUtils; +import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.granularity.Granularities; +import org.apache.druid.java.util.common.guava.Sequence; +import org.apache.druid.java.util.common.guava.Sequences; +import org.apache.druid.math.expr.ExprMacroTable; +import org.apache.druid.query.dimension.DefaultDimensionSpec; +import org.apache.druid.query.filter.SelectorDimFilter; +import org.apache.druid.segment.column.ColumnCapabilities; +import org.apache.druid.segment.column.ValueType; +import org.apache.druid.segment.virtual.ExpressionVirtualColumn; +import org.joda.time.Duration; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.function.Function; +import java.util.function.Supplier; +import java.util.function.ToLongFunction; +import java.util.stream.Collectors; + +public class RowBasedStorageAdapterTest +{ + private static final Map ROW_SIGNATURE = + ImmutableMap.builder() + .put(ValueType.FLOAT.name(), ValueType.FLOAT) + .put(ValueType.DOUBLE.name(), ValueType.DOUBLE) + .put(ValueType.LONG.name(), ValueType.LONG) + .put(ValueType.STRING.name(), ValueType.STRING) + .put(ValueType.COMPLEX.name(), ValueType.COMPLEX) + .build(); + + private static final List>> READ_STRING = + ImmutableList.of( + cursor -> { + final BaseObjectColumnValueSelector selector = + cursor.getColumnSelectorFactory().makeColumnValueSelector(ValueType.STRING.name()); + return selector::getObject; + } + ); + + private static final List>> READ_TIME_AND_STRING = + ImmutableList.of( + cursor -> cursor::getTime, + cursor -> { + final BaseObjectColumnValueSelector selector = + cursor.getColumnSelectorFactory().makeColumnValueSelector(ValueType.STRING.name()); + return selector::getObject; + } + ); + + // Processors used by the "allProcessors" tasks. + private static final LinkedHashMap>> PROCESSORS = new LinkedHashMap<>(); + + @BeforeClass + public static void setUpClass() + { + NullHandling.initializeForTests(); + + PROCESSORS.clear(); + + PROCESSORS.put( + "cursor-time", + cursor -> cursor::getTime + ); + + // Read all the types as all the other types. + + for (final String valueTypeName : ROW_SIGNATURE.keySet()) { + PROCESSORS.put( + StringUtils.format("%s-float", StringUtils.toLowerCase(valueTypeName)), + cursor -> { + final BaseFloatColumnValueSelector selector = + cursor.getColumnSelectorFactory().makeColumnValueSelector(valueTypeName); + return () -> { + if (selector.isNull()) { + return null; + } else { + return selector.getFloat(); + } + }; + } + ); + + PROCESSORS.put( + StringUtils.format("%s-double", StringUtils.toLowerCase(valueTypeName)), + cursor -> { + final BaseDoubleColumnValueSelector selector = + cursor.getColumnSelectorFactory().makeColumnValueSelector(valueTypeName); + return () -> { + if (selector.isNull()) { + return null; + } else { + return selector.getDouble(); + } + }; + } + ); + + PROCESSORS.put( + StringUtils.format("%s-long", StringUtils.toLowerCase(valueTypeName)), + cursor -> { + final BaseLongColumnValueSelector selector = + cursor.getColumnSelectorFactory().makeColumnValueSelector(valueTypeName); + return () -> { + if (selector.isNull()) { + return null; + } else { + return selector.getLong(); + } + }; + } + ); + + PROCESSORS.put( + StringUtils.format("%s-string", StringUtils.toLowerCase(valueTypeName)), + cursor -> { + final DimensionSelector selector = + cursor.getColumnSelectorFactory().makeDimensionSelector(DefaultDimensionSpec.of(valueTypeName)); + return selector::defaultGetObject; + } + ); + + PROCESSORS.put( + StringUtils.format("%s-object", StringUtils.toLowerCase(valueTypeName)), + cursor -> { + final BaseObjectColumnValueSelector selector = + cursor.getColumnSelectorFactory().makeColumnValueSelector(valueTypeName); + return selector::getObject; + } + ); + } + } + + /** + * A RowAdapter for Integers where: + * + * 1) timestampFunction returns a timestamp where the millis instant is equal to that integer as a number of hours + * since the epoch (1970). + * 2) columnFunction provides columns named after value types where each one equal to the cast to that type. All + * other columns return null. + */ + private static final RowAdapter ROW_ADAPTER = + new RowAdapter() + { + @Override + public ToLongFunction timestampFunction() + { + return i -> i * Duration.standardHours(1).getMillis(); + } + + @Override + public Function columnFunction(String columnName) + { + final ValueType valueType = GuavaUtils.getEnumIfPresent(ValueType.class, columnName); + + if (valueType == null || valueType == ValueType.COMPLEX) { + return i -> null; + } else { + return i -> DimensionHandlerUtils.convertObjectToType(i, valueType); + } + } + }; + + private static RowBasedStorageAdapter createIntAdapter(final int... ints) + { + return new RowBasedStorageAdapter<>( + Arrays.stream(ints).boxed().collect(Collectors.toList()), + ROW_ADAPTER, + ROW_SIGNATURE + ); + } + + @Rule + public ExpectedException expectedException = ExpectedException.none(); + + @Test + public void test_getInterval() + { + final RowBasedStorageAdapter adapter = createIntAdapter(); + Assert.assertEquals(Intervals.ETERNITY, adapter.getInterval()); + } + + @Test + public void test_getAvailableDimensions() + { + final RowBasedStorageAdapter adapter = createIntAdapter(); + + // Sort them for comparison purposes. + Assert.assertEquals( + ROW_SIGNATURE.keySet().stream().sorted().collect(Collectors.toList()), + Lists.newArrayList(adapter.getAvailableDimensions()).stream().sorted().collect(Collectors.toList()) + ); + } + + @Test + public void test_getAvailableMetrics() + { + final RowBasedStorageAdapter adapter = createIntAdapter(); + + Assert.assertEquals( + Collections.emptyList(), + Lists.newArrayList(adapter.getAvailableMetrics()) + ); + } + + @Test + public void test_getDimensionCardinality_knownColumns() + { + final RowBasedStorageAdapter adapter = createIntAdapter(0, 1, 2); + + // Row based adapters don't know cardinality (they don't walk their Iterables until makeCursors is called). + for (String column : ROW_SIGNATURE.keySet()) { + Assert.assertEquals(Integer.MAX_VALUE, adapter.getDimensionCardinality(column)); + } + } + + @Test + public void test_getDimensionCardinality_unknownColumn() + { + final RowBasedStorageAdapter adapter = createIntAdapter(0, 1, 2); + Assert.assertEquals(Integer.MAX_VALUE, adapter.getDimensionCardinality("unknown")); + } + + @Test + public void test_getDimensionCardinality_timeColumn() + { + final RowBasedStorageAdapter adapter = createIntAdapter(0, 1, 2); + Assert.assertEquals(Integer.MAX_VALUE, adapter.getDimensionCardinality("__time")); + } + + @Test + public void test_getMinTime() + { + final RowBasedStorageAdapter adapter = createIntAdapter(0, 1, 2); + Assert.assertEquals(Intervals.ETERNITY.getStart(), adapter.getMinTime()); + } + + @Test + public void test_getMaxTime() + { + final RowBasedStorageAdapter adapter = createIntAdapter(0, 1, 2); + Assert.assertEquals(Intervals.ETERNITY.getEnd().minus(1), adapter.getMaxTime()); + } + + @Test + public void test_getMinValue() + { + final RowBasedStorageAdapter adapter = createIntAdapter(0, 1, 2); + + // Row based adapters don't know min/max values, so they always return null. + // Test both known and unknown columns. + final List columns = + ImmutableList.builder().addAll(ROW_SIGNATURE.keySet()).add("unknown", "__time").build(); + + for (String column : columns) { + Assert.assertNull(column, adapter.getMinValue(column)); + } + } + + @Test + public void test_getMaxValue() + { + final RowBasedStorageAdapter adapter = createIntAdapter(0, 1, 2); + + // Row based adapters don't know min/max values, so they always return null. + // Test both known and unknown columns. + final List columns = + ImmutableList.builder().addAll(ROW_SIGNATURE.keySet()).add("unknown", "__time").build(); + + for (String column : columns) { + Assert.assertNull(column, adapter.getMaxValue(column)); + } + } + + @Test + public void test_getCapabilities() + { + final RowBasedStorageAdapter adapter = createIntAdapter(0, 1, 2); + + // Row based adapters don't know cardinality (they don't walk their Iterables until makeCursors is called). + for (String column : ROW_SIGNATURE.keySet()) { + Assert.assertEquals(Integer.MAX_VALUE, adapter.getDimensionCardinality(column)); + } + } + + @Test + public void test_getColumnCapabilities_float() + { + final RowBasedStorageAdapter adapter = createIntAdapter(0, 1, 2); + + final ColumnCapabilities capabilities = adapter.getColumnCapabilities(ValueType.FLOAT.name()); + Assert.assertEquals(ValueType.FLOAT, capabilities.getType()); + Assert.assertFalse(capabilities.hasMultipleValues()); + Assert.assertTrue(capabilities.isComplete()); + } + + @Test + public void test_getColumnCapabilities_double() + { + final RowBasedStorageAdapter adapter = createIntAdapter(0, 1, 2); + + final ColumnCapabilities capabilities = adapter.getColumnCapabilities(ValueType.DOUBLE.name()); + Assert.assertEquals(ValueType.DOUBLE, capabilities.getType()); + Assert.assertFalse(capabilities.hasMultipleValues()); + Assert.assertTrue(capabilities.isComplete()); + } + + @Test + public void test_getColumnCapabilities_long() + { + final RowBasedStorageAdapter adapter = createIntAdapter(0, 1, 2); + + final ColumnCapabilities capabilities = adapter.getColumnCapabilities(ValueType.LONG.name()); + Assert.assertEquals(ValueType.LONG, capabilities.getType()); + Assert.assertFalse(capabilities.hasMultipleValues()); + Assert.assertTrue(capabilities.isComplete()); + } + + @Test + public void test_getColumnCapabilities_string() + { + final RowBasedStorageAdapter adapter = createIntAdapter(0, 1, 2); + + final ColumnCapabilities capabilities = adapter.getColumnCapabilities(ValueType.STRING.name()); + Assert.assertEquals(ValueType.STRING, capabilities.getType()); + + // Note: unlike numeric types, STRING-typed columns report that they might have multiple values and that they + // are incomplete. It would be good in the future to support some way of changing this, when it is known ahead + // of time that multi-valuedness is impossible. + Assert.assertTrue(capabilities.hasMultipleValues()); + Assert.assertFalse(capabilities.isComplete()); + } + + @Test + public void test_getColumnCapabilities_complex() + { + final RowBasedStorageAdapter adapter = createIntAdapter(0, 1, 2); + + final ColumnCapabilities capabilities = adapter.getColumnCapabilities(ValueType.COMPLEX.name()); + + // Note: unlike numeric types, COMPLEX-typed columns report that they might have multiple values and that they + // are incomplete. + Assert.assertEquals(ValueType.COMPLEX, capabilities.getType()); + Assert.assertTrue(capabilities.hasMultipleValues()); + Assert.assertFalse(capabilities.isComplete()); + } + + @Test + public void test_getColumnCapabilities_nonexistent() + { + final RowBasedStorageAdapter adapter = createIntAdapter(0, 1, 2); + Assert.assertNull(adapter.getColumnCapabilities("nonexistent")); + } + + @Test + public void test_getColumnTypeName() + { + final RowBasedStorageAdapter adapter = createIntAdapter(0, 1, 2); + + for (String columnName : ROW_SIGNATURE.keySet()) { + Assert.assertEquals(columnName, ValueType.valueOf(columnName).name(), adapter.getColumnTypeName(columnName)); + } + } + + @Test + public void test_getColumnTypeName_nonexistent() + { + final RowBasedStorageAdapter adapter = createIntAdapter(0, 1, 2); + Assert.assertNull(adapter.getColumnTypeName("nonexistent")); + } + + @Test + public void test_getNumRows() + { + final RowBasedStorageAdapter adapter = createIntAdapter(0, 1, 2); + expectedException.expect(UnsupportedOperationException.class); + adapter.getMetadata(); + } + + @Test + public void test_getMaxIngestedEventTime() + { + final RowBasedStorageAdapter adapter = createIntAdapter(0, 1, 2); + Assert.assertEquals(Intervals.ETERNITY.getEnd().minus(1), adapter.getMaxIngestedEventTime()); + } + + @Test + public void test_getMetadata() + { + final RowBasedStorageAdapter adapter = createIntAdapter(0, 1, 2); + expectedException.expect(UnsupportedOperationException.class); + adapter.getMetadata(); + } + + @Test + public void test_makeCursors_filterOnLong() + { + final RowBasedStorageAdapter adapter = createIntAdapter(0, 1, 2); + + final Sequence cursors = adapter.makeCursors( + new SelectorDimFilter(ValueType.LONG.name(), "1.0", null).toFilter(), + Intervals.ETERNITY, + VirtualColumns.EMPTY, + Granularities.ALL, + false, + null + ); + + Assert.assertEquals( + ImmutableList.of( + ImmutableList.of("1") + ), + walkCursors(cursors, READ_STRING) + ); + } + + @Test + public void test_makeCursors_filterOnNonexistentColumnEqualsNull() + { + final RowBasedStorageAdapter adapter = createIntAdapter(0, 1); + + final Sequence cursors = adapter.makeCursors( + new SelectorDimFilter("nonexistent", null, null).toFilter(), + Intervals.ETERNITY, + VirtualColumns.EMPTY, + Granularities.ALL, + false, + null + ); + + Assert.assertEquals( + ImmutableList.of( + ImmutableList.of("0"), + ImmutableList.of("1") + ), + walkCursors(cursors, READ_STRING) + ); + } + + @Test + public void test_makeCursors_filterOnVirtualColumn() + { + final RowBasedStorageAdapter adapter = createIntAdapter(0, 1); + + final Sequence cursors = adapter.makeCursors( + new SelectorDimFilter("vc", "2", null).toFilter(), + Intervals.ETERNITY, + VirtualColumns.create( + ImmutableList.of( + new ExpressionVirtualColumn( + "vc", + "\"LONG\" + 1", + ValueType.LONG, + ExprMacroTable.nil() + ) + ) + ), + Granularities.ALL, + false, + null + ); + + Assert.assertEquals( + ImmutableList.of( + ImmutableList.of("1") + ), + walkCursors(cursors, READ_STRING) + ); + } + + @Test + public void test_makeCursors_descending() + { + final RowBasedStorageAdapter adapter = createIntAdapter(0, 1, 2); + + final Sequence cursors = adapter.makeCursors( + null, + Intervals.ETERNITY, + VirtualColumns.EMPTY, + Granularities.ALL, + true, + null + ); + + Assert.assertEquals( + ImmutableList.of( + ImmutableList.of("2"), + ImmutableList.of("1"), + ImmutableList.of("0") + ), + walkCursors(cursors, READ_STRING) + ); + } + + @Test + public void test_makeCursors_intervalDoesNotMatch() + { + final RowBasedStorageAdapter adapter = createIntAdapter(0, 1, 2); + + final Sequence cursors = adapter.makeCursors( + null, + Intervals.of("2000/P1D"), + VirtualColumns.EMPTY, + Granularities.ALL, + false, + null + ); + + Assert.assertEquals( + ImmutableList.of(), + walkCursors(cursors, READ_STRING) + ); + } + + @Test + public void test_makeCursors_intervalPartiallyMatches() + { + final RowBasedStorageAdapter adapter = createIntAdapter(0, 1, 2); + + final Sequence cursors = adapter.makeCursors( + null, + Intervals.of("1970-01-01T01/PT1H"), + VirtualColumns.EMPTY, + Granularities.ALL, + false, + null + ); + + Assert.assertEquals( + ImmutableList.of( + ImmutableList.of("1") + ), + walkCursors(cursors, READ_STRING) + ); + } + + @Test + public void test_makeCursors_hourGranularity() + { + final RowBasedStorageAdapter adapter = createIntAdapter(0, 1, 1, 2, 3); + + final Sequence cursors = adapter.makeCursors( + null, + Intervals.of("1970/1971"), + VirtualColumns.EMPTY, + Granularities.HOUR, + false, + null + ); + + Assert.assertEquals( + ImmutableList.of( + ImmutableList.of(DateTimes.of("1970-01-01T00"), "0"), + ImmutableList.of(DateTimes.of("1970-01-01T01"), "1"), + ImmutableList.of(DateTimes.of("1970-01-01T01"), "1"), + ImmutableList.of(DateTimes.of("1970-01-01T02"), "2"), + ImmutableList.of(DateTimes.of("1970-01-01T03"), "3") + ), + walkCursors(cursors, READ_TIME_AND_STRING) + ); + } + + @Test + public void test_makeCursors_hourGranularityWithInterval() + { + final RowBasedStorageAdapter adapter = createIntAdapter(0, 1, 1, 2, 3); + + final Sequence cursors = adapter.makeCursors( + null, + Intervals.of("1970-01-01T01/PT2H"), + VirtualColumns.EMPTY, + Granularities.HOUR, + false, + null + ); + + Assert.assertEquals( + ImmutableList.of( + ImmutableList.of(DateTimes.of("1970-01-01T01"), "1"), + ImmutableList.of(DateTimes.of("1970-01-01T01"), "1"), + ImmutableList.of(DateTimes.of("1970-01-01T02"), "2") + ), + walkCursors(cursors, READ_TIME_AND_STRING) + ); + } + + @Test + public void test_makeCursors_hourGranularityWithIntervalDescending() + { + final RowBasedStorageAdapter adapter = createIntAdapter(0, 1, 1, 2, 3); + + final Sequence cursors = adapter.makeCursors( + null, + Intervals.of("1970-01-01T01/PT2H"), + VirtualColumns.EMPTY, + Granularities.HOUR, + true, + null + ); + + Assert.assertEquals( + ImmutableList.of( + ImmutableList.of(DateTimes.of("1970-01-01T02"), "2"), + ImmutableList.of(DateTimes.of("1970-01-01T01"), "1"), + ImmutableList.of(DateTimes.of("1970-01-01T01"), "1") + ), + walkCursors(cursors, READ_TIME_AND_STRING) + ); + } + + @Test + public void test_makeCursors_allProcessors() + { + final RowBasedStorageAdapter adapter = createIntAdapter(0, 1); + + final Sequence cursors = adapter.makeCursors( + null, + Intervals.ETERNITY, + VirtualColumns.EMPTY, + Granularities.ALL, + false, + null + ); + + Assert.assertEquals( + ImmutableList.of( + Lists.newArrayList( + Intervals.ETERNITY.getStart(), + + // FLOAT + 0f, + 0d, + 0L, + "0.0", + 0f, + + // DOUBLE + 0f, + 0d, + 0L, + "0.0", + 0d, + + // LONG + 0f, + 0d, + 0L, + "0", + 0L, + + // STRING + 0f, + 0d, + 0L, + "0", + "0", + + // COMPLEX + NullHandling.defaultFloatValue(), + NullHandling.defaultDoubleValue(), + NullHandling.defaultLongValue(), + null, + null + ), + Lists.newArrayList( + Intervals.ETERNITY.getStart(), + + // FLOAT + 1f, + 1d, + 1L, + "1.0", + 1f, + + // DOUBLE + 1f, + 1d, + 1L, + "1.0", + 1d, + + // LONG + 1f, + 1d, + 1L, + "1", + 1L, + + // STRING + 1f, + 1d, + 1L, + "1", + "1", + + // COMPLEX + NullHandling.defaultFloatValue(), + NullHandling.defaultDoubleValue(), + NullHandling.defaultLongValue(), + null, + null + ) + ), + walkCursors(cursors, new ArrayList<>(PROCESSORS.values())) + ); + } + + @Test + public void test_makeCursors_filterOnNonexistentColumnEqualsNonnull() + { + final RowBasedStorageAdapter adapter = createIntAdapter(0, 1); + + final Sequence cursors = adapter.makeCursors( + new SelectorDimFilter("nonexistent", "abc", null).toFilter(), + Intervals.ETERNITY, + VirtualColumns.EMPTY, + Granularities.ALL, + false, + null + ); + + Assert.assertEquals( + ImmutableList.of(), + walkCursors(cursors, new ArrayList<>(PROCESSORS.values())) + ); + } + + private static List> walkCursors( + final Sequence cursors, + final List>> processors + ) + { + return cursors.flatMap( + cursor -> { + // Gather test-value suppliers together. + final List> suppliers = new ArrayList<>(); + for (Function> processor : processors) { + suppliers.add(processor.apply(cursor)); + } + + final List> retVal = new ArrayList<>(); + + while (!cursor.isDone()) { + final List row = new ArrayList<>(); + + for (Supplier supplier : suppliers) { + row.add(supplier.get()); + } + + retVal.add(row); + cursor.advanceUninterruptibly(); + } + + return Sequences.simple(retVal); + } + ).toList(); + } +} diff --git a/processing/src/test/java/org/apache/druid/segment/filter/BaseFilterTest.java b/processing/src/test/java/org/apache/druid/segment/filter/BaseFilterTest.java index 2062a8dd3f1..190ab06329b 100644 --- a/processing/src/test/java/org/apache/druid/segment/filter/BaseFilterTest.java +++ b/processing/src/test/java/org/apache/druid/segment/filter/BaseFilterTest.java @@ -67,6 +67,7 @@ import org.apache.druid.segment.QueryableIndex; import org.apache.druid.segment.QueryableIndexStorageAdapter; import org.apache.druid.segment.RowAdapters; import org.apache.druid.segment.RowBasedColumnSelectorFactory; +import org.apache.druid.segment.RowBasedStorageAdapter; import org.apache.druid.segment.StorageAdapter; import org.apache.druid.segment.VirtualColumns; import org.apache.druid.segment.column.ValueType; @@ -263,23 +264,38 @@ public abstract class BaseFilterTest extends InitializedNullHandlingTest "off-heap memory segment write-out medium", OffHeapMemorySegmentWriteOutMediumFactory.instance() ); - final Map>> finishers = ImmutableMap.of( - "incremental", - input -> { - final IncrementalIndex index = input.buildIncrementalIndex(); - return Pair.of(new IncrementalIndexStorageAdapter(index), index); - }, - "mmapped", - input -> { - final QueryableIndex index = input.buildMMappedIndex(); - return Pair.of(new QueryableIndexStorageAdapter(index), index); - }, - "mmappedMerged", - input -> { - final QueryableIndex index = input.buildMMappedMergedIndex(); - return Pair.of(new QueryableIndexStorageAdapter(index), index); - } - ); + final Map>> finishers = + ImmutableMap.>>builder() + .put( + "incremental", + input -> { + final IncrementalIndex index = input.buildIncrementalIndex(); + return Pair.of(new IncrementalIndexStorageAdapter(index), index); + } + ) + .put( + "mmapped", + input -> { + final QueryableIndex index = input.buildMMappedIndex(); + return Pair.of(new QueryableIndexStorageAdapter(index), index); + } + ) + .put( + "mmappedMerged", + input -> { + final QueryableIndex index = input.buildMMappedMergedIndex(); + return Pair.of(new QueryableIndexStorageAdapter(index), index); + } + ) + .put( + "rowBasedWithoutTypeSignature", + input -> Pair.of(input.buildRowBasedSegmentWithoutTypeSignature().asStorageAdapter(), () -> {}) + ) + .put( + "rowBasedWithTypeSignature", + input -> Pair.of(input.buildRowBasedSegmentWithTypeSignature().asStorageAdapter(), () -> {}) + ) + .build(); for (Map.Entry bitmapSerdeFactoryEntry : bitmapSerdeFactories.entrySet()) { for (Map.Entry segmentWriteOutMediumFactoryEntry : @@ -664,8 +680,9 @@ public abstract class BaseFilterTest extends InitializedNullHandlingTest final List expectedRows ) { - // IncrementalIndex cannot ever vectorize. - final boolean testVectorized = !(adapter instanceof IncrementalIndexStorageAdapter); + // IncrementalIndex and RowBasedSegment cannot ever vectorize. + final boolean testVectorized = + !(adapter instanceof IncrementalIndexStorageAdapter) && !(adapter instanceof RowBasedStorageAdapter); assertFilterMatches(filter, expectedRows, testVectorized); }