mirror of https://github.com/apache/druid.git
RowBasedSegment: Use Sequence instead of Iterable. (#11886)
* RowBasedSegment: Use Sequence instead of Iterable. The main reason this is good is that Sequences can include baggage that must be closed after iteration is finished. This enables creating RowBasedSegments on top of closeable sequences of rows. To preserve the optimization that allows reversing a List without copying it, this patch also makes SimpleSequence its own class and allows extracting the Iterable that was used to create it. * Fix tests.
This commit is contained in:
parent
db4d157be6
commit
14b0b4aee2
|
@ -27,7 +27,6 @@ import java.io.Closeable;
|
|||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.Comparator;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.Executor;
|
||||
|
||||
|
@ -41,22 +40,7 @@ public class Sequences
|
|||
|
||||
public static <T> Sequence<T> simple(final Iterable<T> iterable)
|
||||
{
|
||||
return new BaseSequence<>(
|
||||
new BaseSequence.IteratorMaker<T, Iterator<T>>()
|
||||
{
|
||||
@Override
|
||||
public Iterator<T> make()
|
||||
{
|
||||
return iterable.iterator();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void cleanup(Iterator<T> iterFromMake)
|
||||
{
|
||||
|
||||
}
|
||||
}
|
||||
);
|
||||
return new SimpleSequence<>(iterable);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
|
|
|
@ -0,0 +1,60 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.java.util.common.guava;
|
||||
|
||||
import java.util.Iterator;
|
||||
|
||||
/**
|
||||
* Simple Sequence based on an Iterable, created using {@link Sequences#simple}.
|
||||
*/
|
||||
public class SimpleSequence<T> extends BaseSequence<T, Iterator<T>>
|
||||
{
|
||||
private final Iterable<T> iterable;
|
||||
|
||||
SimpleSequence(final Iterable<T> iterable)
|
||||
{
|
||||
super(
|
||||
new BaseSequence.IteratorMaker<T, Iterator<T>>()
|
||||
{
|
||||
@Override
|
||||
public Iterator<T> make()
|
||||
{
|
||||
return iterable.iterator();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void cleanup(Iterator<T> iterFromMake)
|
||||
{
|
||||
|
||||
}
|
||||
}
|
||||
);
|
||||
|
||||
this.iterable = iterable;
|
||||
}
|
||||
|
||||
/**
|
||||
* Retrieve the original Iterable.
|
||||
*/
|
||||
public Iterable<T> getIterable()
|
||||
{
|
||||
return iterable;
|
||||
}
|
||||
}
|
|
@ -28,6 +28,7 @@ import java.util.List;
|
|||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
public class BaseSequenceTest
|
||||
{
|
||||
|
@ -35,14 +36,14 @@ public class BaseSequenceTest
|
|||
public void testSanity() throws Exception
|
||||
{
|
||||
final List<Integer> vals = Arrays.asList(1, 2, 3, 4, 5);
|
||||
SequenceTestHelper.testAll(Sequences.simple(vals), vals);
|
||||
SequenceTestHelper.testAll(makeBaseSequence(vals), vals);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNothing() throws Exception
|
||||
{
|
||||
final List<Integer> vals = Collections.emptyList();
|
||||
SequenceTestHelper.testAll(Sequences.simple(vals), vals);
|
||||
SequenceTestHelper.testAll(makeBaseSequence(vals), vals);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -88,4 +89,23 @@ public class BaseSequenceTest
|
|||
SequenceTestHelper.testClosed(closedCounter, seq);
|
||||
}
|
||||
|
||||
private static <T> Sequence<T> makeBaseSequence(final Iterable<T> iterable)
|
||||
{
|
||||
return new BaseSequence<>(
|
||||
new BaseSequence.IteratorMaker<T, Iterator<T>>()
|
||||
{
|
||||
@Override
|
||||
public Iterator<T> make()
|
||||
{
|
||||
return iterable.iterator();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void cleanup(Iterator<T> iterFromMake)
|
||||
{
|
||||
|
||||
}
|
||||
}
|
||||
);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,58 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.java.util.common.guava;
|
||||
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
public class SimpleSequenceTest
|
||||
{
|
||||
@Test
|
||||
public void testSanity() throws Exception
|
||||
{
|
||||
final List<Integer> vals = Arrays.asList(1, 2, 3, 4, 5);
|
||||
SequenceTestHelper.testAll(Sequences.simple(vals), vals);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNothing() throws Exception
|
||||
{
|
||||
final List<Integer> vals = Collections.emptyList();
|
||||
SequenceTestHelper.testAll(Sequences.simple(vals), vals);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetIterable()
|
||||
{
|
||||
final List<Integer> vals = Collections.singletonList(1);
|
||||
Assert.assertSame(vals, ((SimpleSequence<Integer>) Sequences.simple(vals)).getIterable());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testToList()
|
||||
{
|
||||
final List<Integer> vals = Arrays.asList(1, 2);
|
||||
Assert.assertEquals(vals, Sequences.simple(vals).toList());
|
||||
}
|
||||
}
|
|
@ -20,6 +20,7 @@
|
|||
package org.apache.druid.query.lookup;
|
||||
|
||||
import org.apache.druid.java.util.common.ISE;
|
||||
import org.apache.druid.java.util.common.guava.Sequences;
|
||||
import org.apache.druid.segment.RowAdapter;
|
||||
import org.apache.druid.segment.RowBasedSegment;
|
||||
import org.apache.druid.segment.column.ColumnType;
|
||||
|
@ -47,7 +48,7 @@ public class LookupSegment extends RowBasedSegment<Map.Entry<String, String>>
|
|||
{
|
||||
super(
|
||||
SegmentId.dummy(lookupName),
|
||||
() -> {
|
||||
Sequences.simple(() -> {
|
||||
final LookupExtractor extractor = lookupExtractorFactory.get();
|
||||
|
||||
if (!extractor.canIterate()) {
|
||||
|
@ -55,7 +56,7 @@ public class LookupSegment extends RowBasedSegment<Map.Entry<String, String>>
|
|||
}
|
||||
|
||||
return extractor.iterable().iterator();
|
||||
},
|
||||
}),
|
||||
new RowAdapter<Map.Entry<String, String>>()
|
||||
{
|
||||
@Override
|
||||
|
|
|
@ -20,6 +20,7 @@
|
|||
package org.apache.druid.segment;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import org.apache.druid.java.util.common.guava.Sequence;
|
||||
import org.apache.druid.segment.column.RowSignature;
|
||||
import org.apache.druid.timeline.SegmentId;
|
||||
import org.joda.time.Interval;
|
||||
|
@ -48,20 +49,21 @@ public class RowBasedSegment<RowType> implements Segment
|
|||
* field, even if it doesn't appear in "rowSignature".
|
||||
*
|
||||
* @param segmentId segment identifier; will be returned by {@link #getId()}
|
||||
* @param rowIterable objects that comprise this segment
|
||||
* @param rowSequence objects that comprise this segment. Must be re-iterable if support for {@link Cursor#reset()}
|
||||
* is required. Otherwise, does not need to be re-iterable.
|
||||
* @param rowAdapter adapter used for reading these objects
|
||||
* @param rowSignature signature of the columns in these objects
|
||||
*/
|
||||
public RowBasedSegment(
|
||||
final SegmentId segmentId,
|
||||
final Iterable<RowType> rowIterable,
|
||||
final Sequence<RowType> rowSequence,
|
||||
final RowAdapter<RowType> rowAdapter,
|
||||
final RowSignature rowSignature
|
||||
)
|
||||
{
|
||||
this.segmentId = Preconditions.checkNotNull(segmentId, "segmentId");
|
||||
this.storageAdapter = new RowBasedStorageAdapter<>(
|
||||
rowIterable,
|
||||
rowSequence,
|
||||
rowAdapter,
|
||||
rowSignature
|
||||
);
|
||||
|
|
|
@ -26,6 +26,7 @@ import org.apache.druid.java.util.common.Intervals;
|
|||
import org.apache.druid.java.util.common.granularity.Granularity;
|
||||
import org.apache.druid.java.util.common.guava.Sequence;
|
||||
import org.apache.druid.java.util.common.guava.Sequences;
|
||||
import org.apache.druid.java.util.common.guava.SimpleSequence;
|
||||
import org.apache.druid.query.QueryMetrics;
|
||||
import org.apache.druid.query.filter.Filter;
|
||||
import org.apache.druid.segment.column.ColumnCapabilities;
|
||||
|
@ -48,17 +49,17 @@ import java.util.List;
|
|||
*/
|
||||
public class RowBasedStorageAdapter<RowType> implements StorageAdapter
|
||||
{
|
||||
private final Iterable<RowType> rowIterable;
|
||||
private final Sequence<RowType> rowSequence;
|
||||
private final RowAdapter<RowType> rowAdapter;
|
||||
private final RowSignature rowSignature;
|
||||
|
||||
RowBasedStorageAdapter(
|
||||
final Iterable<RowType> rowIterable,
|
||||
final Sequence<RowType> rowSequence,
|
||||
final RowAdapter<RowType> rowAdapter,
|
||||
final RowSignature rowSignature
|
||||
)
|
||||
{
|
||||
this.rowIterable = Preconditions.checkNotNull(rowIterable, "rowIterable");
|
||||
this.rowSequence = Preconditions.checkNotNull(rowSequence, "rowSequence");
|
||||
this.rowAdapter = Preconditions.checkNotNull(rowAdapter, "rowAdapter");
|
||||
this.rowSignature = Preconditions.checkNotNull(rowSignature, "rowSignature");
|
||||
}
|
||||
|
@ -123,8 +124,12 @@ public class RowBasedStorageAdapter<RowType> implements StorageAdapter
|
|||
@Override
|
||||
public int getNumRows()
|
||||
{
|
||||
if (rowIterable instanceof Collection) {
|
||||
return ((Collection<RowType>) rowIterable).size();
|
||||
if (rowSequence instanceof SimpleSequence) {
|
||||
final Iterable<RowType> rowIterable = ((SimpleSequence<RowType>) rowSequence).getIterable();
|
||||
|
||||
if (rowIterable instanceof Collection) {
|
||||
return ((Collection<RowType>) rowIterable).size();
|
||||
}
|
||||
}
|
||||
|
||||
// getNumRows is only used by tests and by segmentMetadataQuery (which would be odd to call on inline datasources)
|
||||
|
@ -161,7 +166,7 @@ public class RowBasedStorageAdapter<RowType> implements StorageAdapter
|
|||
}
|
||||
|
||||
final RowWalker<RowType> rowWalker = new RowWalker<>(
|
||||
descending ? reverse(rowIterable) : rowIterable,
|
||||
descending ? reverse(rowSequence) : rowSequence,
|
||||
rowAdapter
|
||||
);
|
||||
|
||||
|
@ -171,7 +176,7 @@ public class RowBasedStorageAdapter<RowType> implements StorageAdapter
|
|||
Iterables.transform(
|
||||
descending ? reverse(bucketIntervals) : bucketIntervals,
|
||||
bucketInterval ->
|
||||
new RowBasedCursor<>(
|
||||
(Cursor) new RowBasedCursor<>(
|
||||
rowWalker,
|
||||
rowAdapter,
|
||||
filter,
|
||||
|
@ -182,7 +187,25 @@ public class RowBasedStorageAdapter<RowType> implements StorageAdapter
|
|||
rowSignature
|
||||
)
|
||||
)
|
||||
);
|
||||
).withBaggage(rowWalker::close);
|
||||
}
|
||||
|
||||
/**
|
||||
* Reverse a Sequence.
|
||||
*
|
||||
* If the Sequence is a {@link SimpleSequence}, this avoids materialization because its
|
||||
* {@link SimpleSequence#toList()} method returns a view of the underlying list. Otherwise, the list will be
|
||||
* materialized and then reversed.
|
||||
*/
|
||||
private static <T> Sequence<T> reverse(final Sequence<T> sequence)
|
||||
{
|
||||
if (sequence instanceof SimpleSequence) {
|
||||
// Extract the Iterable from the SimpleSequence, so we can reverse it without copying if it is List-backed.
|
||||
return Sequences.simple(reverse(((SimpleSequence<T>) sequence).getIterable()));
|
||||
} else {
|
||||
// Materialize and reverse the objects.
|
||||
return Sequences.simple(Lists.reverse(sequence.toList()));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -191,8 +214,7 @@ public class RowBasedStorageAdapter<RowType> implements StorageAdapter
|
|||
private static <T> Iterable<T> reverse(final Iterable<T> iterable)
|
||||
{
|
||||
if (iterable instanceof List) {
|
||||
//noinspection unchecked, rawtypes
|
||||
return Lists.reverse((List) iterable);
|
||||
return Lists.reverse((List<T>) iterable);
|
||||
} else {
|
||||
// Materialize and reverse the objects. Note that this means reversing non-List Iterables will use extra memory.
|
||||
return Lists.reverse(Lists.newArrayList(iterable));
|
||||
|
|
|
@ -19,11 +19,14 @@
|
|||
|
||||
package org.apache.druid.segment;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import org.apache.druid.java.util.common.ISE;
|
||||
import org.apache.druid.java.util.common.guava.Sequence;
|
||||
import org.apache.druid.java.util.common.guava.Yielder;
|
||||
import org.apache.druid.java.util.common.guava.Yielders;
|
||||
import org.joda.time.DateTime;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.util.Iterator;
|
||||
import java.io.IOException;
|
||||
import java.util.function.ToLongFunction;
|
||||
|
||||
/**
|
||||
|
@ -32,61 +35,69 @@ import java.util.function.ToLongFunction;
|
|||
*/
|
||||
public class RowWalker<T>
|
||||
{
|
||||
private final Iterable<T> rowIterable;
|
||||
private final Sequence<T> rowSequence;
|
||||
private final ToLongFunction<T> timestampFunction;
|
||||
|
||||
@Nullable
|
||||
private Iterator<T> rowIterator;
|
||||
@Nullable // null = closed
|
||||
private Yielder<T> rowYielder;
|
||||
|
||||
@Nullable
|
||||
private T current = null;
|
||||
|
||||
RowWalker(final Iterable<T> rowIterable, final RowAdapter<T> rowAdapter)
|
||||
RowWalker(final Sequence<T> rowSequence, final RowAdapter<T> rowAdapter)
|
||||
{
|
||||
this.rowIterable = rowIterable;
|
||||
this.rowSequence = rowSequence;
|
||||
this.timestampFunction = rowAdapter.timestampFunction();
|
||||
|
||||
reset();
|
||||
this.rowYielder = Yielders.each(rowSequence);
|
||||
}
|
||||
|
||||
public boolean isDone()
|
||||
{
|
||||
return current == null;
|
||||
return rowYielder == null || rowYielder.isDone();
|
||||
}
|
||||
|
||||
public T currentRow()
|
||||
{
|
||||
return Preconditions.checkNotNull(current, "cannot call currentRow when isDone == true");
|
||||
if (isDone()) {
|
||||
throw new ISE("cannot call currentRow when isDone == true");
|
||||
}
|
||||
|
||||
return rowYielder.get();
|
||||
}
|
||||
|
||||
public void advance()
|
||||
{
|
||||
if (rowIterator == null) {
|
||||
throw new IllegalStateException("cannot call advance when isDone == true");
|
||||
} else if (rowIterator.hasNext()) {
|
||||
current = rowIterator.next();
|
||||
|
||||
if (current == null) {
|
||||
throw new NullPointerException("null row encountered in walker");
|
||||
}
|
||||
if (isDone()) {
|
||||
throw new ISE("cannot call advance when isDone == true");
|
||||
} else {
|
||||
rowIterator = null;
|
||||
current = null;
|
||||
rowYielder = rowYielder.next(null);
|
||||
}
|
||||
}
|
||||
|
||||
public void reset()
|
||||
{
|
||||
rowIterator = rowIterable.iterator();
|
||||
advance();
|
||||
close();
|
||||
rowYielder = Yielders.each(rowSequence);
|
||||
}
|
||||
|
||||
public void skipToDateTime(final DateTime timestamp, final boolean descending)
|
||||
{
|
||||
while (current != null && (descending
|
||||
? timestamp.isBefore(timestampFunction.applyAsLong(current))
|
||||
: timestamp.isAfter(timestampFunction.applyAsLong(current)))) {
|
||||
while (!isDone() && (descending
|
||||
? timestamp.isBefore(timestampFunction.applyAsLong(rowYielder.get()))
|
||||
: timestamp.isAfter(timestampFunction.applyAsLong(rowYielder.get())))) {
|
||||
advance();
|
||||
}
|
||||
}
|
||||
|
||||
public void close()
|
||||
{
|
||||
if (rowYielder != null) {
|
||||
try {
|
||||
rowYielder.close();
|
||||
}
|
||||
catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
finally {
|
||||
rowYielder = null;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -21,7 +21,6 @@ package org.apache.druid.query.lookup;
|
|||
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.ImmutableSortedMap;
|
||||
import com.google.common.collect.Iterables;
|
||||
import com.google.common.collect.Lists;
|
||||
import org.apache.druid.java.util.common.Intervals;
|
||||
import org.apache.druid.java.util.common.Pair;
|
||||
|
@ -184,17 +183,22 @@ public class LookupSegmentTest
|
|||
null
|
||||
);
|
||||
|
||||
final Cursor cursor = Iterables.getOnlyElement(cursors.toList());
|
||||
final List<Pair<String, String>> kvs = new ArrayList<>();
|
||||
|
||||
cursors.accumulate(
|
||||
null,
|
||||
(ignored, cursor) -> {
|
||||
final ColumnValueSelector keySelector = cursor.getColumnSelectorFactory().makeColumnValueSelector("k");
|
||||
final ColumnValueSelector valueSelector = cursor.getColumnSelectorFactory().makeColumnValueSelector("v");
|
||||
|
||||
final ColumnValueSelector keySelector = cursor.getColumnSelectorFactory().makeColumnValueSelector("k");
|
||||
final ColumnValueSelector valueSelector = cursor.getColumnSelectorFactory().makeColumnValueSelector("v");
|
||||
while (!cursor.isDone()) {
|
||||
kvs.add(Pair.of(String.valueOf(keySelector.getObject()), String.valueOf(valueSelector.getObject())));
|
||||
cursor.advanceUninterruptibly();
|
||||
}
|
||||
|
||||
while (!cursor.isDone()) {
|
||||
kvs.add(Pair.of(String.valueOf(keySelector.getObject()), String.valueOf(valueSelector.getObject())));
|
||||
cursor.advanceUninterruptibly();
|
||||
}
|
||||
return null;
|
||||
}
|
||||
);
|
||||
|
||||
Assert.assertEquals(
|
||||
ImmutableList.of(
|
||||
|
|
|
@ -88,40 +88,46 @@ public class ScanQueryResultOrderingTest
|
|||
private static final List<RowBasedSegment<Object[]>> SEGMENTS = ImmutableList.of(
|
||||
new RowBasedSegment<>(
|
||||
SegmentId.of(DATASOURCE, Intervals.of("2000-01-01/P1D"), "1", 0),
|
||||
ImmutableList.of(
|
||||
new Object[]{DateTimes.of("2000T01"), 101},
|
||||
new Object[]{DateTimes.of("2000T01"), 80},
|
||||
new Object[]{DateTimes.of("2000T01"), 232},
|
||||
new Object[]{DateTimes.of("2000T01"), 12},
|
||||
new Object[]{DateTimes.of("2000T02"), 808},
|
||||
new Object[]{DateTimes.of("2000T02"), 411},
|
||||
new Object[]{DateTimes.of("2000T02"), 383},
|
||||
new Object[]{DateTimes.of("2000T05"), 22}
|
||||
Sequences.simple(
|
||||
ImmutableList.of(
|
||||
new Object[]{DateTimes.of("2000T01"), 101},
|
||||
new Object[]{DateTimes.of("2000T01"), 80},
|
||||
new Object[]{DateTimes.of("2000T01"), 232},
|
||||
new Object[]{DateTimes.of("2000T01"), 12},
|
||||
new Object[]{DateTimes.of("2000T02"), 808},
|
||||
new Object[]{DateTimes.of("2000T02"), 411},
|
||||
new Object[]{DateTimes.of("2000T02"), 383},
|
||||
new Object[]{DateTimes.of("2000T05"), 22}
|
||||
)
|
||||
),
|
||||
ROW_ADAPTER,
|
||||
ROW_SIGNATURE
|
||||
),
|
||||
new RowBasedSegment<>(
|
||||
SegmentId.of(DATASOURCE, Intervals.of("2000-01-01/P1D"), "1", 1),
|
||||
ImmutableList.of(
|
||||
new Object[]{DateTimes.of("2000T01"), 333},
|
||||
new Object[]{DateTimes.of("2000T01"), 222},
|
||||
new Object[]{DateTimes.of("2000T01"), 444},
|
||||
new Object[]{DateTimes.of("2000T01"), 111},
|
||||
new Object[]{DateTimes.of("2000T03"), 555},
|
||||
new Object[]{DateTimes.of("2000T03"), 999},
|
||||
new Object[]{DateTimes.of("2000T03"), 888},
|
||||
new Object[]{DateTimes.of("2000T05"), 777}
|
||||
Sequences.simple(
|
||||
ImmutableList.of(
|
||||
new Object[]{DateTimes.of("2000T01"), 333},
|
||||
new Object[]{DateTimes.of("2000T01"), 222},
|
||||
new Object[]{DateTimes.of("2000T01"), 444},
|
||||
new Object[]{DateTimes.of("2000T01"), 111},
|
||||
new Object[]{DateTimes.of("2000T03"), 555},
|
||||
new Object[]{DateTimes.of("2000T03"), 999},
|
||||
new Object[]{DateTimes.of("2000T03"), 888},
|
||||
new Object[]{DateTimes.of("2000T05"), 777}
|
||||
)
|
||||
),
|
||||
ROW_ADAPTER,
|
||||
ROW_SIGNATURE
|
||||
),
|
||||
new RowBasedSegment<>(
|
||||
SegmentId.of(DATASOURCE, Intervals.of("2000-01-02/P1D"), "1", 0),
|
||||
ImmutableList.of(
|
||||
new Object[]{DateTimes.of("2000-01-02T00"), 7},
|
||||
new Object[]{DateTimes.of("2000-01-02T02"), 9},
|
||||
new Object[]{DateTimes.of("2000-01-02T03"), 8}
|
||||
Sequences.simple(
|
||||
ImmutableList.of(
|
||||
new Object[]{DateTimes.of("2000-01-02T00"), 7},
|
||||
new Object[]{DateTimes.of("2000-01-02T02"), 9},
|
||||
new Object[]{DateTimes.of("2000-01-02T03"), 8}
|
||||
)
|
||||
),
|
||||
ROW_ADAPTER,
|
||||
ROW_SIGNATURE
|
||||
|
|
|
@ -25,6 +25,7 @@ import com.google.common.collect.Iterables;
|
|||
import com.google.common.collect.Lists;
|
||||
import org.apache.druid.data.input.InputRow;
|
||||
import org.apache.druid.java.util.common.StringUtils;
|
||||
import org.apache.druid.java.util.common.guava.Sequences;
|
||||
import org.apache.druid.query.aggregation.AggregatorFactory;
|
||||
import org.apache.druid.query.aggregation.CountAggregatorFactory;
|
||||
import org.apache.druid.segment.column.ColumnCapabilities;
|
||||
|
@ -209,7 +210,7 @@ public class IndexBuilder
|
|||
{
|
||||
return new RowBasedSegment<>(
|
||||
SegmentId.dummy("IndexBuilder"),
|
||||
rows,
|
||||
Sequences.simple(rows),
|
||||
RowAdapters.standardRow(),
|
||||
RowSignature.empty()
|
||||
);
|
||||
|
@ -227,7 +228,7 @@ public class IndexBuilder
|
|||
|
||||
return new RowBasedSegment<>(
|
||||
SegmentId.dummy("IndexBuilder"),
|
||||
rows,
|
||||
Sequences.simple(rows),
|
||||
RowAdapters.standardRow(),
|
||||
rowSignatureBuilder.build()
|
||||
);
|
||||
|
|
|
@ -49,6 +49,7 @@ import java.util.Arrays;
|
|||
import java.util.Collections;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.function.Function;
|
||||
import java.util.function.Supplier;
|
||||
import java.util.function.ToLongFunction;
|
||||
|
@ -207,18 +208,21 @@ public class RowBasedStorageAdapterTest
|
|||
}
|
||||
};
|
||||
|
||||
private static RowBasedStorageAdapter<Integer> createIntAdapter(final int... ints)
|
||||
@Rule
|
||||
public ExpectedException expectedException = ExpectedException.none();
|
||||
|
||||
public final AtomicLong numCloses = new AtomicLong();
|
||||
|
||||
private RowBasedStorageAdapter<Integer> createIntAdapter(final int... ints)
|
||||
{
|
||||
return new RowBasedStorageAdapter<>(
|
||||
Arrays.stream(ints).boxed().collect(Collectors.toList()),
|
||||
Sequences.simple(Arrays.stream(ints).boxed().collect(Collectors.toList()))
|
||||
.withBaggage(numCloses::incrementAndGet),
|
||||
ROW_ADAPTER,
|
||||
ROW_SIGNATURE
|
||||
);
|
||||
}
|
||||
|
||||
@Rule
|
||||
public ExpectedException expectedException = ExpectedException.none();
|
||||
|
||||
@Test
|
||||
public void test_getInterval()
|
||||
{
|
||||
|
@ -462,6 +466,8 @@ public class RowBasedStorageAdapterTest
|
|||
),
|
||||
walkCursors(cursors, READ_STRING)
|
||||
);
|
||||
|
||||
Assert.assertEquals(1, numCloses.get());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -485,6 +491,8 @@ public class RowBasedStorageAdapterTest
|
|||
),
|
||||
walkCursors(cursors, READ_STRING)
|
||||
);
|
||||
|
||||
Assert.assertEquals(1, numCloses.get());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -516,6 +524,8 @@ public class RowBasedStorageAdapterTest
|
|||
),
|
||||
walkCursors(cursors, READ_STRING)
|
||||
);
|
||||
|
||||
Assert.assertEquals(1, numCloses.get());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -540,6 +550,8 @@ public class RowBasedStorageAdapterTest
|
|||
),
|
||||
walkCursors(cursors, READ_STRING)
|
||||
);
|
||||
|
||||
Assert.assertEquals(1, numCloses.get());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -560,6 +572,8 @@ public class RowBasedStorageAdapterTest
|
|||
ImmutableList.of(),
|
||||
walkCursors(cursors, READ_STRING)
|
||||
);
|
||||
|
||||
Assert.assertEquals(1, numCloses.get());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -582,6 +596,8 @@ public class RowBasedStorageAdapterTest
|
|||
),
|
||||
walkCursors(cursors, READ_STRING)
|
||||
);
|
||||
|
||||
Assert.assertEquals(1, numCloses.get());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -608,6 +624,8 @@ public class RowBasedStorageAdapterTest
|
|||
),
|
||||
walkCursors(cursors, READ_TIME_AND_STRING)
|
||||
);
|
||||
|
||||
Assert.assertEquals(1, numCloses.get());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -632,6 +650,8 @@ public class RowBasedStorageAdapterTest
|
|||
),
|
||||
walkCursors(cursors, READ_TIME_AND_STRING)
|
||||
);
|
||||
|
||||
Assert.assertEquals(1, numCloses.get());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -656,6 +676,8 @@ public class RowBasedStorageAdapterTest
|
|||
),
|
||||
walkCursors(cursors, READ_TIME_AND_STRING)
|
||||
);
|
||||
|
||||
Assert.assertEquals(1, numCloses.get());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -767,6 +789,8 @@ public class RowBasedStorageAdapterTest
|
|||
),
|
||||
walkCursors(cursors, new ArrayList<>(PROCESSORS.values()))
|
||||
);
|
||||
|
||||
Assert.assertEquals(1, numCloses.get());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -787,6 +811,8 @@ public class RowBasedStorageAdapterTest
|
|||
ImmutableList.of(),
|
||||
walkCursors(cursors, new ArrayList<>(PROCESSORS.values()))
|
||||
);
|
||||
|
||||
Assert.assertEquals(1, numCloses.get());
|
||||
}
|
||||
|
||||
private static List<List<Object>> walkCursors(
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
|
||||
package org.apache.druid.segment;
|
||||
|
||||
import org.apache.druid.java.util.common.guava.Sequences;
|
||||
import org.apache.druid.query.DataSource;
|
||||
import org.apache.druid.query.InlineDataSource;
|
||||
import org.apache.druid.segment.join.JoinableFactory;
|
||||
|
@ -44,7 +45,7 @@ public class InlineSegmentWrangler implements SegmentWrangler
|
|||
return Collections.singletonList(
|
||||
new RowBasedSegment<>(
|
||||
SegmentId.dummy(SEGMENT_ID),
|
||||
inlineDataSource.getRows(),
|
||||
Sequences.simple(inlineDataSource.getRows()),
|
||||
inlineDataSource.rowAdapter(),
|
||||
inlineDataSource.getRowSignature()
|
||||
)
|
||||
|
|
|
@ -1436,7 +1436,7 @@ public class ClientQuerySegmentWalkerTest
|
|||
ReferenceCountingSegment.wrapSegment(
|
||||
new RowBasedSegment<>(
|
||||
SegmentId.of(name, INTERVAL, VERSION, SHARD_SPEC.getPartitionNum()),
|
||||
dataSource.getRows(),
|
||||
Sequences.simple(dataSource.getRows()),
|
||||
dataSource.rowAdapter(),
|
||||
dataSource.getRowSignature()
|
||||
),
|
||||
|
|
Loading…
Reference in New Issue