diff --git a/core/src/main/java/org/apache/druid/java/util/common/guava/Sequences.java b/core/src/main/java/org/apache/druid/java/util/common/guava/Sequences.java index 2a29db5e6c0..3dc44cb063b 100644 --- a/core/src/main/java/org/apache/druid/java/util/common/guava/Sequences.java +++ b/core/src/main/java/org/apache/druid/java/util/common/guava/Sequences.java @@ -27,7 +27,6 @@ import java.io.Closeable; import java.util.Arrays; import java.util.Collections; import java.util.Comparator; -import java.util.Iterator; import java.util.List; import java.util.concurrent.Executor; @@ -41,22 +40,7 @@ public class Sequences public static Sequence simple(final Iterable iterable) { - return new BaseSequence<>( - new BaseSequence.IteratorMaker>() - { - @Override - public Iterator make() - { - return iterable.iterator(); - } - - @Override - public void cleanup(Iterator iterFromMake) - { - - } - } - ); + return new SimpleSequence<>(iterable); } @SuppressWarnings("unchecked") diff --git a/core/src/main/java/org/apache/druid/java/util/common/guava/SimpleSequence.java b/core/src/main/java/org/apache/druid/java/util/common/guava/SimpleSequence.java new file mode 100644 index 00000000000..48fa4c68817 --- /dev/null +++ b/core/src/main/java/org/apache/druid/java/util/common/guava/SimpleSequence.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.java.util.common.guava; + +import java.util.Iterator; + +/** + * Simple Sequence based on an Iterable, created using {@link Sequences#simple}. + */ +public class SimpleSequence extends BaseSequence> +{ + private final Iterable iterable; + + SimpleSequence(final Iterable iterable) + { + super( + new BaseSequence.IteratorMaker>() + { + @Override + public Iterator make() + { + return iterable.iterator(); + } + + @Override + public void cleanup(Iterator iterFromMake) + { + + } + } + ); + + this.iterable = iterable; + } + + /** + * Retrieve the original Iterable. + */ + public Iterable getIterable() + { + return iterable; + } +} diff --git a/core/src/test/java/org/apache/druid/java/util/common/guava/BaseSequenceTest.java b/core/src/test/java/org/apache/druid/java/util/common/guava/BaseSequenceTest.java index 9e41faf3c96..3417e05b694 100644 --- a/core/src/test/java/org/apache/druid/java/util/common/guava/BaseSequenceTest.java +++ b/core/src/test/java/org/apache/druid/java/util/common/guava/BaseSequenceTest.java @@ -28,6 +28,7 @@ import java.util.List; import java.util.concurrent.atomic.AtomicInteger; /** + * */ public class BaseSequenceTest { @@ -35,14 +36,14 @@ public class BaseSequenceTest public void testSanity() throws Exception { final List vals = Arrays.asList(1, 2, 3, 4, 5); - SequenceTestHelper.testAll(Sequences.simple(vals), vals); + SequenceTestHelper.testAll(makeBaseSequence(vals), vals); } @Test public void testNothing() throws Exception { final List vals = Collections.emptyList(); - SequenceTestHelper.testAll(Sequences.simple(vals), vals); + SequenceTestHelper.testAll(makeBaseSequence(vals), vals); } @Test @@ -88,4 +89,23 @@ public class BaseSequenceTest SequenceTestHelper.testClosed(closedCounter, seq); } + private static Sequence makeBaseSequence(final Iterable iterable) + { + return new BaseSequence<>( + new BaseSequence.IteratorMaker>() + { + @Override + public Iterator make() + { + return iterable.iterator(); + } + + @Override + public void cleanup(Iterator iterFromMake) + { + + } + } + ); + } } diff --git a/core/src/test/java/org/apache/druid/java/util/common/guava/SimpleSequenceTest.java b/core/src/test/java/org/apache/druid/java/util/common/guava/SimpleSequenceTest.java new file mode 100644 index 00000000000..59f9c78c1ff --- /dev/null +++ b/core/src/test/java/org/apache/druid/java/util/common/guava/SimpleSequenceTest.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.java.util.common.guava; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +public class SimpleSequenceTest +{ + @Test + public void testSanity() throws Exception + { + final List vals = Arrays.asList(1, 2, 3, 4, 5); + SequenceTestHelper.testAll(Sequences.simple(vals), vals); + } + + @Test + public void testNothing() throws Exception + { + final List vals = Collections.emptyList(); + SequenceTestHelper.testAll(Sequences.simple(vals), vals); + } + + @Test + public void testGetIterable() + { + final List vals = Collections.singletonList(1); + Assert.assertSame(vals, ((SimpleSequence) Sequences.simple(vals)).getIterable()); + } + + @Test + public void testToList() + { + final List vals = Arrays.asList(1, 2); + Assert.assertEquals(vals, Sequences.simple(vals).toList()); + } +} diff --git a/processing/src/main/java/org/apache/druid/query/lookup/LookupSegment.java b/processing/src/main/java/org/apache/druid/query/lookup/LookupSegment.java index 132715998c5..51d8f8994c5 100644 --- a/processing/src/main/java/org/apache/druid/query/lookup/LookupSegment.java +++ b/processing/src/main/java/org/apache/druid/query/lookup/LookupSegment.java @@ -20,6 +20,7 @@ package org.apache.druid.query.lookup; import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.guava.Sequences; import org.apache.druid.segment.RowAdapter; import org.apache.druid.segment.RowBasedSegment; import org.apache.druid.segment.column.ColumnType; @@ -47,7 +48,7 @@ public class LookupSegment extends RowBasedSegment> { super( SegmentId.dummy(lookupName), - () -> { + Sequences.simple(() -> { final LookupExtractor extractor = lookupExtractorFactory.get(); if (!extractor.canIterate()) { @@ -55,7 +56,7 @@ public class LookupSegment extends RowBasedSegment> } return extractor.iterable().iterator(); - }, + }), new RowAdapter>() { @Override diff --git a/processing/src/main/java/org/apache/druid/segment/RowBasedSegment.java b/processing/src/main/java/org/apache/druid/segment/RowBasedSegment.java index 36e84e8bd0e..d9b2cfbd324 100644 --- a/processing/src/main/java/org/apache/druid/segment/RowBasedSegment.java +++ b/processing/src/main/java/org/apache/druid/segment/RowBasedSegment.java @@ -20,6 +20,7 @@ package org.apache.druid.segment; import com.google.common.base.Preconditions; +import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.segment.column.RowSignature; import org.apache.druid.timeline.SegmentId; import org.joda.time.Interval; @@ -48,20 +49,21 @@ public class RowBasedSegment implements Segment * field, even if it doesn't appear in "rowSignature". * * @param segmentId segment identifier; will be returned by {@link #getId()} - * @param rowIterable objects that comprise this segment + * @param rowSequence objects that comprise this segment. Must be re-iterable if support for {@link Cursor#reset()} + * is required. Otherwise, does not need to be re-iterable. * @param rowAdapter adapter used for reading these objects * @param rowSignature signature of the columns in these objects */ public RowBasedSegment( final SegmentId segmentId, - final Iterable rowIterable, + final Sequence rowSequence, final RowAdapter rowAdapter, final RowSignature rowSignature ) { this.segmentId = Preconditions.checkNotNull(segmentId, "segmentId"); this.storageAdapter = new RowBasedStorageAdapter<>( - rowIterable, + rowSequence, rowAdapter, rowSignature ); diff --git a/processing/src/main/java/org/apache/druid/segment/RowBasedStorageAdapter.java b/processing/src/main/java/org/apache/druid/segment/RowBasedStorageAdapter.java index 843972564ac..f40f83e8928 100644 --- a/processing/src/main/java/org/apache/druid/segment/RowBasedStorageAdapter.java +++ b/processing/src/main/java/org/apache/druid/segment/RowBasedStorageAdapter.java @@ -26,6 +26,7 @@ import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.granularity.Granularity; import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.java.util.common.guava.Sequences; +import org.apache.druid.java.util.common.guava.SimpleSequence; import org.apache.druid.query.QueryMetrics; import org.apache.druid.query.filter.Filter; import org.apache.druid.segment.column.ColumnCapabilities; @@ -48,17 +49,17 @@ import java.util.List; */ public class RowBasedStorageAdapter implements StorageAdapter { - private final Iterable rowIterable; + private final Sequence rowSequence; private final RowAdapter rowAdapter; private final RowSignature rowSignature; RowBasedStorageAdapter( - final Iterable rowIterable, + final Sequence rowSequence, final RowAdapter rowAdapter, final RowSignature rowSignature ) { - this.rowIterable = Preconditions.checkNotNull(rowIterable, "rowIterable"); + this.rowSequence = Preconditions.checkNotNull(rowSequence, "rowSequence"); this.rowAdapter = Preconditions.checkNotNull(rowAdapter, "rowAdapter"); this.rowSignature = Preconditions.checkNotNull(rowSignature, "rowSignature"); } @@ -123,8 +124,12 @@ public class RowBasedStorageAdapter implements StorageAdapter @Override public int getNumRows() { - if (rowIterable instanceof Collection) { - return ((Collection) rowIterable).size(); + if (rowSequence instanceof SimpleSequence) { + final Iterable rowIterable = ((SimpleSequence) rowSequence).getIterable(); + + if (rowIterable instanceof Collection) { + return ((Collection) rowIterable).size(); + } } // getNumRows is only used by tests and by segmentMetadataQuery (which would be odd to call on inline datasources) @@ -161,7 +166,7 @@ public class RowBasedStorageAdapter implements StorageAdapter } final RowWalker rowWalker = new RowWalker<>( - descending ? reverse(rowIterable) : rowIterable, + descending ? reverse(rowSequence) : rowSequence, rowAdapter ); @@ -171,7 +176,7 @@ public class RowBasedStorageAdapter implements StorageAdapter Iterables.transform( descending ? reverse(bucketIntervals) : bucketIntervals, bucketInterval -> - new RowBasedCursor<>( + (Cursor) new RowBasedCursor<>( rowWalker, rowAdapter, filter, @@ -182,7 +187,25 @@ public class RowBasedStorageAdapter implements StorageAdapter rowSignature ) ) - ); + ).withBaggage(rowWalker::close); + } + + /** + * Reverse a Sequence. + * + * If the Sequence is a {@link SimpleSequence}, this avoids materialization because its + * {@link SimpleSequence#toList()} method returns a view of the underlying list. Otherwise, the list will be + * materialized and then reversed. + */ + private static Sequence reverse(final Sequence sequence) + { + if (sequence instanceof SimpleSequence) { + // Extract the Iterable from the SimpleSequence, so we can reverse it without copying if it is List-backed. + return Sequences.simple(reverse(((SimpleSequence) sequence).getIterable())); + } else { + // Materialize and reverse the objects. + return Sequences.simple(Lists.reverse(sequence.toList())); + } } /** @@ -191,8 +214,7 @@ public class RowBasedStorageAdapter implements StorageAdapter private static Iterable reverse(final Iterable iterable) { if (iterable instanceof List) { - //noinspection unchecked, rawtypes - return Lists.reverse((List) iterable); + return Lists.reverse((List) iterable); } else { // Materialize and reverse the objects. Note that this means reversing non-List Iterables will use extra memory. return Lists.reverse(Lists.newArrayList(iterable)); diff --git a/processing/src/main/java/org/apache/druid/segment/RowWalker.java b/processing/src/main/java/org/apache/druid/segment/RowWalker.java index 2a0e2ab5a6e..9f5c89a6437 100644 --- a/processing/src/main/java/org/apache/druid/segment/RowWalker.java +++ b/processing/src/main/java/org/apache/druid/segment/RowWalker.java @@ -19,11 +19,14 @@ package org.apache.druid.segment; -import com.google.common.base.Preconditions; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.guava.Sequence; +import org.apache.druid.java.util.common.guava.Yielder; +import org.apache.druid.java.util.common.guava.Yielders; import org.joda.time.DateTime; import javax.annotation.Nullable; -import java.util.Iterator; +import java.io.IOException; import java.util.function.ToLongFunction; /** @@ -32,61 +35,69 @@ import java.util.function.ToLongFunction; */ public class RowWalker { - private final Iterable rowIterable; + private final Sequence rowSequence; private final ToLongFunction timestampFunction; - @Nullable - private Iterator rowIterator; + @Nullable // null = closed + private Yielder rowYielder; - @Nullable - private T current = null; - - RowWalker(final Iterable rowIterable, final RowAdapter rowAdapter) + RowWalker(final Sequence rowSequence, final RowAdapter rowAdapter) { - this.rowIterable = rowIterable; + this.rowSequence = rowSequence; this.timestampFunction = rowAdapter.timestampFunction(); - - reset(); + this.rowYielder = Yielders.each(rowSequence); } public boolean isDone() { - return current == null; + return rowYielder == null || rowYielder.isDone(); } public T currentRow() { - return Preconditions.checkNotNull(current, "cannot call currentRow when isDone == true"); + if (isDone()) { + throw new ISE("cannot call currentRow when isDone == true"); + } + + return rowYielder.get(); } public void advance() { - if (rowIterator == null) { - throw new IllegalStateException("cannot call advance when isDone == true"); - } else if (rowIterator.hasNext()) { - current = rowIterator.next(); - - if (current == null) { - throw new NullPointerException("null row encountered in walker"); - } + if (isDone()) { + throw new ISE("cannot call advance when isDone == true"); } else { - rowIterator = null; - current = null; + rowYielder = rowYielder.next(null); } } public void reset() { - rowIterator = rowIterable.iterator(); - advance(); + close(); + rowYielder = Yielders.each(rowSequence); } public void skipToDateTime(final DateTime timestamp, final boolean descending) { - while (current != null && (descending - ? timestamp.isBefore(timestampFunction.applyAsLong(current)) - : timestamp.isAfter(timestampFunction.applyAsLong(current)))) { + while (!isDone() && (descending + ? timestamp.isBefore(timestampFunction.applyAsLong(rowYielder.get())) + : timestamp.isAfter(timestampFunction.applyAsLong(rowYielder.get())))) { advance(); } } + + public void close() + { + if (rowYielder != null) { + try { + rowYielder.close(); + } + catch (IOException e) { + throw new RuntimeException(e); + } + finally { + rowYielder = null; + } + } + } } diff --git a/processing/src/test/java/org/apache/druid/query/lookup/LookupSegmentTest.java b/processing/src/test/java/org/apache/druid/query/lookup/LookupSegmentTest.java index 96167fd38bd..74a07138a39 100644 --- a/processing/src/test/java/org/apache/druid/query/lookup/LookupSegmentTest.java +++ b/processing/src/test/java/org/apache/druid/query/lookup/LookupSegmentTest.java @@ -21,7 +21,6 @@ package org.apache.druid.query.lookup; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSortedMap; -import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.Pair; @@ -184,17 +183,22 @@ public class LookupSegmentTest null ); - final Cursor cursor = Iterables.getOnlyElement(cursors.toList()); final List> kvs = new ArrayList<>(); + cursors.accumulate( + null, + (ignored, cursor) -> { + final ColumnValueSelector keySelector = cursor.getColumnSelectorFactory().makeColumnValueSelector("k"); + final ColumnValueSelector valueSelector = cursor.getColumnSelectorFactory().makeColumnValueSelector("v"); - final ColumnValueSelector keySelector = cursor.getColumnSelectorFactory().makeColumnValueSelector("k"); - final ColumnValueSelector valueSelector = cursor.getColumnSelectorFactory().makeColumnValueSelector("v"); + while (!cursor.isDone()) { + kvs.add(Pair.of(String.valueOf(keySelector.getObject()), String.valueOf(valueSelector.getObject()))); + cursor.advanceUninterruptibly(); + } - while (!cursor.isDone()) { - kvs.add(Pair.of(String.valueOf(keySelector.getObject()), String.valueOf(valueSelector.getObject()))); - cursor.advanceUninterruptibly(); - } + return null; + } + ); Assert.assertEquals( ImmutableList.of( diff --git a/processing/src/test/java/org/apache/druid/query/scan/ScanQueryResultOrderingTest.java b/processing/src/test/java/org/apache/druid/query/scan/ScanQueryResultOrderingTest.java index 4433fc0a614..fee5b4b9de7 100644 --- a/processing/src/test/java/org/apache/druid/query/scan/ScanQueryResultOrderingTest.java +++ b/processing/src/test/java/org/apache/druid/query/scan/ScanQueryResultOrderingTest.java @@ -88,40 +88,46 @@ public class ScanQueryResultOrderingTest private static final List> SEGMENTS = ImmutableList.of( new RowBasedSegment<>( SegmentId.of(DATASOURCE, Intervals.of("2000-01-01/P1D"), "1", 0), - ImmutableList.of( - new Object[]{DateTimes.of("2000T01"), 101}, - new Object[]{DateTimes.of("2000T01"), 80}, - new Object[]{DateTimes.of("2000T01"), 232}, - new Object[]{DateTimes.of("2000T01"), 12}, - new Object[]{DateTimes.of("2000T02"), 808}, - new Object[]{DateTimes.of("2000T02"), 411}, - new Object[]{DateTimes.of("2000T02"), 383}, - new Object[]{DateTimes.of("2000T05"), 22} + Sequences.simple( + ImmutableList.of( + new Object[]{DateTimes.of("2000T01"), 101}, + new Object[]{DateTimes.of("2000T01"), 80}, + new Object[]{DateTimes.of("2000T01"), 232}, + new Object[]{DateTimes.of("2000T01"), 12}, + new Object[]{DateTimes.of("2000T02"), 808}, + new Object[]{DateTimes.of("2000T02"), 411}, + new Object[]{DateTimes.of("2000T02"), 383}, + new Object[]{DateTimes.of("2000T05"), 22} + ) ), ROW_ADAPTER, ROW_SIGNATURE ), new RowBasedSegment<>( SegmentId.of(DATASOURCE, Intervals.of("2000-01-01/P1D"), "1", 1), - ImmutableList.of( - new Object[]{DateTimes.of("2000T01"), 333}, - new Object[]{DateTimes.of("2000T01"), 222}, - new Object[]{DateTimes.of("2000T01"), 444}, - new Object[]{DateTimes.of("2000T01"), 111}, - new Object[]{DateTimes.of("2000T03"), 555}, - new Object[]{DateTimes.of("2000T03"), 999}, - new Object[]{DateTimes.of("2000T03"), 888}, - new Object[]{DateTimes.of("2000T05"), 777} + Sequences.simple( + ImmutableList.of( + new Object[]{DateTimes.of("2000T01"), 333}, + new Object[]{DateTimes.of("2000T01"), 222}, + new Object[]{DateTimes.of("2000T01"), 444}, + new Object[]{DateTimes.of("2000T01"), 111}, + new Object[]{DateTimes.of("2000T03"), 555}, + new Object[]{DateTimes.of("2000T03"), 999}, + new Object[]{DateTimes.of("2000T03"), 888}, + new Object[]{DateTimes.of("2000T05"), 777} + ) ), ROW_ADAPTER, ROW_SIGNATURE ), new RowBasedSegment<>( SegmentId.of(DATASOURCE, Intervals.of("2000-01-02/P1D"), "1", 0), - ImmutableList.of( - new Object[]{DateTimes.of("2000-01-02T00"), 7}, - new Object[]{DateTimes.of("2000-01-02T02"), 9}, - new Object[]{DateTimes.of("2000-01-02T03"), 8} + Sequences.simple( + ImmutableList.of( + new Object[]{DateTimes.of("2000-01-02T00"), 7}, + new Object[]{DateTimes.of("2000-01-02T02"), 9}, + new Object[]{DateTimes.of("2000-01-02T03"), 8} + ) ), ROW_ADAPTER, ROW_SIGNATURE diff --git a/processing/src/test/java/org/apache/druid/segment/IndexBuilder.java b/processing/src/test/java/org/apache/druid/segment/IndexBuilder.java index 0f4ecddde99..fc7bc7ec54c 100644 --- a/processing/src/test/java/org/apache/druid/segment/IndexBuilder.java +++ b/processing/src/test/java/org/apache/druid/segment/IndexBuilder.java @@ -25,6 +25,7 @@ import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import org.apache.druid.data.input.InputRow; import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.guava.Sequences; import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.CountAggregatorFactory; import org.apache.druid.segment.column.ColumnCapabilities; @@ -209,7 +210,7 @@ public class IndexBuilder { return new RowBasedSegment<>( SegmentId.dummy("IndexBuilder"), - rows, + Sequences.simple(rows), RowAdapters.standardRow(), RowSignature.empty() ); @@ -227,7 +228,7 @@ public class IndexBuilder return new RowBasedSegment<>( SegmentId.dummy("IndexBuilder"), - rows, + Sequences.simple(rows), RowAdapters.standardRow(), rowSignatureBuilder.build() ); diff --git a/processing/src/test/java/org/apache/druid/segment/RowBasedStorageAdapterTest.java b/processing/src/test/java/org/apache/druid/segment/RowBasedStorageAdapterTest.java index b3ab7b71063..8e3e5c3d612 100644 --- a/processing/src/test/java/org/apache/druid/segment/RowBasedStorageAdapterTest.java +++ b/processing/src/test/java/org/apache/druid/segment/RowBasedStorageAdapterTest.java @@ -49,6 +49,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.LinkedHashMap; import java.util.List; +import java.util.concurrent.atomic.AtomicLong; import java.util.function.Function; import java.util.function.Supplier; import java.util.function.ToLongFunction; @@ -207,18 +208,21 @@ public class RowBasedStorageAdapterTest } }; - private static RowBasedStorageAdapter createIntAdapter(final int... ints) + @Rule + public ExpectedException expectedException = ExpectedException.none(); + + public final AtomicLong numCloses = new AtomicLong(); + + private RowBasedStorageAdapter createIntAdapter(final int... ints) { return new RowBasedStorageAdapter<>( - Arrays.stream(ints).boxed().collect(Collectors.toList()), + Sequences.simple(Arrays.stream(ints).boxed().collect(Collectors.toList())) + .withBaggage(numCloses::incrementAndGet), ROW_ADAPTER, ROW_SIGNATURE ); } - @Rule - public ExpectedException expectedException = ExpectedException.none(); - @Test public void test_getInterval() { @@ -462,6 +466,8 @@ public class RowBasedStorageAdapterTest ), walkCursors(cursors, READ_STRING) ); + + Assert.assertEquals(1, numCloses.get()); } @Test @@ -485,6 +491,8 @@ public class RowBasedStorageAdapterTest ), walkCursors(cursors, READ_STRING) ); + + Assert.assertEquals(1, numCloses.get()); } @Test @@ -516,6 +524,8 @@ public class RowBasedStorageAdapterTest ), walkCursors(cursors, READ_STRING) ); + + Assert.assertEquals(1, numCloses.get()); } @Test @@ -540,6 +550,8 @@ public class RowBasedStorageAdapterTest ), walkCursors(cursors, READ_STRING) ); + + Assert.assertEquals(1, numCloses.get()); } @Test @@ -560,6 +572,8 @@ public class RowBasedStorageAdapterTest ImmutableList.of(), walkCursors(cursors, READ_STRING) ); + + Assert.assertEquals(1, numCloses.get()); } @Test @@ -582,6 +596,8 @@ public class RowBasedStorageAdapterTest ), walkCursors(cursors, READ_STRING) ); + + Assert.assertEquals(1, numCloses.get()); } @Test @@ -608,6 +624,8 @@ public class RowBasedStorageAdapterTest ), walkCursors(cursors, READ_TIME_AND_STRING) ); + + Assert.assertEquals(1, numCloses.get()); } @Test @@ -632,6 +650,8 @@ public class RowBasedStorageAdapterTest ), walkCursors(cursors, READ_TIME_AND_STRING) ); + + Assert.assertEquals(1, numCloses.get()); } @Test @@ -656,6 +676,8 @@ public class RowBasedStorageAdapterTest ), walkCursors(cursors, READ_TIME_AND_STRING) ); + + Assert.assertEquals(1, numCloses.get()); } @Test @@ -767,6 +789,8 @@ public class RowBasedStorageAdapterTest ), walkCursors(cursors, new ArrayList<>(PROCESSORS.values())) ); + + Assert.assertEquals(1, numCloses.get()); } @Test @@ -787,6 +811,8 @@ public class RowBasedStorageAdapterTest ImmutableList.of(), walkCursors(cursors, new ArrayList<>(PROCESSORS.values())) ); + + Assert.assertEquals(1, numCloses.get()); } private static List> walkCursors( diff --git a/server/src/main/java/org/apache/druid/segment/InlineSegmentWrangler.java b/server/src/main/java/org/apache/druid/segment/InlineSegmentWrangler.java index 6417df0e6d1..a68859359cd 100644 --- a/server/src/main/java/org/apache/druid/segment/InlineSegmentWrangler.java +++ b/server/src/main/java/org/apache/druid/segment/InlineSegmentWrangler.java @@ -19,6 +19,7 @@ package org.apache.druid.segment; +import org.apache.druid.java.util.common.guava.Sequences; import org.apache.druid.query.DataSource; import org.apache.druid.query.InlineDataSource; import org.apache.druid.segment.join.JoinableFactory; @@ -44,7 +45,7 @@ public class InlineSegmentWrangler implements SegmentWrangler return Collections.singletonList( new RowBasedSegment<>( SegmentId.dummy(SEGMENT_ID), - inlineDataSource.getRows(), + Sequences.simple(inlineDataSource.getRows()), inlineDataSource.rowAdapter(), inlineDataSource.getRowSignature() ) diff --git a/server/src/test/java/org/apache/druid/server/ClientQuerySegmentWalkerTest.java b/server/src/test/java/org/apache/druid/server/ClientQuerySegmentWalkerTest.java index cf62a140066..dfb02019b0d 100644 --- a/server/src/test/java/org/apache/druid/server/ClientQuerySegmentWalkerTest.java +++ b/server/src/test/java/org/apache/druid/server/ClientQuerySegmentWalkerTest.java @@ -1436,7 +1436,7 @@ public class ClientQuerySegmentWalkerTest ReferenceCountingSegment.wrapSegment( new RowBasedSegment<>( SegmentId.of(name, INTERVAL, VERSION, SHARD_SPEC.getPartitionNum()), - dataSource.getRows(), + Sequences.simple(dataSource.getRows()), dataSource.rowAdapter(), dataSource.getRowSignature() ),