Switch operators to a push-style API (#13600)

* Switch operators to a push-style API

This API generates nice stack-traces of processing
for Operators.
This commit is contained in:
imply-cheddar 2022-12-23 15:01:55 +09:00 committed by GitHub
parent 8773d619a2
commit 313d937236
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 796 additions and 360 deletions

View File

@ -19,7 +19,6 @@
package org.apache.druid.query.operator;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.query.rowsandcols.RowsAndColumns;
import org.apache.druid.query.rowsandcols.semantic.DefaultSortedGroupPartitioner;
import org.apache.druid.query.rowsandcols.semantic.SortedGroupPartitioner;
@ -53,48 +52,34 @@ public class NaivePartitioningOperator implements Operator
}
@Override
public void open()
public void go(Receiver receiver)
{
child.open();
}
child.go(
new Receiver()
{
@Override
public boolean push(RowsAndColumns rac)
{
SortedGroupPartitioner groupPartitioner = rac.as(SortedGroupPartitioner.class);
if (groupPartitioner == null) {
groupPartitioner = new DefaultSortedGroupPartitioner(rac);
}
@Override
public RowsAndColumns next()
{
if (partitionsIter != null && partitionsIter.hasNext()) {
return partitionsIter.next();
}
partitionsIter = groupPartitioner.partitionOnBoundaries(partitionColumns).iterator();
if (child.hasNext()) {
final RowsAndColumns rac = child.next();
boolean keepItGoing = true;
while (keepItGoing && partitionsIter.hasNext()) {
keepItGoing = receiver.push(partitionsIter.next());
}
return keepItGoing;
}
SortedGroupPartitioner groupPartitioner = rac.as(SortedGroupPartitioner.class);
if (groupPartitioner == null) {
groupPartitioner = new DefaultSortedGroupPartitioner(rac);
}
partitionsIter = groupPartitioner.partitionOnBoundaries(partitionColumns).iterator();
return partitionsIter.next();
}
throw new ISE("Asked for next when already complete");
}
@Override
public boolean hasNext()
{
if (partitionsIter != null && partitionsIter.hasNext()) {
return true;
}
return child.hasNext();
}
@Override
public void close(boolean cascade)
{
if (cascade) {
child.close(cascade);
}
@Override
public void completed()
{
receiver.completed();
}
}
);
}
}

View File

@ -22,72 +22,48 @@ package org.apache.druid.query.operator;
import org.apache.druid.query.rowsandcols.RowsAndColumns;
/**
* An Operator interface that intends to align closely with the Operators that other databases would also tend
* to be implemented using.
* An Operator interface that intends to have implementations that align relatively closely with the Operators that
* other databases would also tend to be implemented using. While a lot of Operator interfaces tend to use a
* pull-based orientation, we use a push-based interface. This is to give us good stacktraces. Because of the
* organization of the go() method, the stack traces thrown out of an Operator will be
* 1. All of the go() calls from the top-level Operator down to the leaf Operator, this part of the stacktrace gives
* visibility into what all of the actions that we expect to happen from the operator chain are
* 2. All of the push() calls up until the exception happens, this part of the stack trace gives us a view of all
* of the things that have happened to the data up until the exception was thrown.
* <p>
* The lifecycle of an operator is that, after creation, it should be opened, and then iterated using hasNext() and
* next(). Finally, when the Operator is no longer useful, it should be closed.
* This "hour glass" structure of the stacktrace is by design. It is very important that implementations of this
* interface never resort to a fluent style, inheritance or other code structuring that removes the name of the active
* Operator from the stacktrace. It should always be possible to find ways to avoid code duplication and still keep
* the Operator's name on the stacktrace.
* <p>
* Operator's methods mimic the methods of an {@code Iterator}, but it does not implement {@code Iterator}
* intentionally. An operator should never be wrapped in an {@code Iterator}. Any code that does that should be
* considered a bug and fixed. This is for two reasons:
* <p>
* 1. An Operator should never be passed around as an {@code Iterator}. An Operator must be closed, if an operator
* gets returned as an {@code Iterator}, the code that sees the {@code Iterator} loses the knowledge that it's
* dealing with an Operator and might not close it. Even something like a {@code CloseableIterator} is an
* anti-pattern as it's possible to use it in a functional manner with code that loses track of the fact that it
* must be closed.
* 2. To avoid "fluent" style composition of functions on Operators. It is important that there never be a set of
* functional primitives for things like map/filter/reduce to "simplify" the implementation of Operators. This is
* because such fluency produces really hard to decipher stack traces as the stacktrace ends up being just a bunch
* of calls from the scaffolding (map/filter/reduce) and not from the actual Operator itself. By not implementing
* {@code Iterator} we are actively increasing the burden of trying to add such functional operations to the point
* that hopefully, though code review, we can ensure that we never develop them. It is infinitely better to preserve
* the stacktrace and "duplicate" the map/filter/reduce scaffolding code.
* The other benefit of the go() method is that it fully encapsulates the lifecycle of the underlying resources.
* This means that it should be possible to use try/finally blocks around calls to go() in order to ensure that
* resources are properly closed.
*/
public interface Operator
{
/**
* Called to initiate the lifecycle of the Operator. If an operator needs to checkout resources or anything to do
* its work, this is probably the place to do it.
* Tells the Operation to start doing its work. Data will be pushed into the Receiver.
*
* Work should *never* be done in this method, this method only exists to acquire resources that are known to be
* needed before doing any work. As a litmus test, if there is ever a call to `op.next()` inside of this method,
* then something has been done wrong as that call to `.next()` is actually doing work. Such code should be moved
* into being lazily evaluated as part of a call to `.next()`.
* @param receiver a receiver that will receive data
*/
void open();
void go(Receiver receiver);
/**
* Returns the next RowsAndColumns object that the Operator can produce. Behavior is undefined if
* {@link #hasNext} returns false.
*
* @return the next RowsAndColumns object that the operator can produce
*/
RowsAndColumns next();
interface Receiver
{
/**
* Used to push data. Return value indicates if more data will be accepted. If false, push should not
* be called anymore.
*
* @param rac {@link RowsAndColumns} of data
* @return a boolean value indicating if more data will be accepted. If false, push should never be called
* anymore
*/
boolean push(RowsAndColumns rac);
/**
* Used to identify if it is safe to call {@link #next}
*
* @return true if it is safe to call {@link #next}
*/
boolean hasNext();
/**
* Closes this Operator. The cascade flag can be used to identify that the intent is to close this operator
* and only this operator without actually closing child operators. Other databases us this sort of functionality
* with a planner that is watching over all of the objects and force-closes even if they were closed during normal
* operations. In Druid, in the data pipeline where this was introduced, we are guaranteed to always have close
* called regardless of errors or exceptions during processing, as such, at time of introduction, there is no
* call that passes false for cascade.
* <p>
* That said, given that this is a common thing for these interfaces for other databases, we want to preserve the
* optionality of being able to leverage what they do. As such, we define the method this way with the belief
* that it might be used in the future. Semantically, this means that all implementations of Operators must
* expect to be closed multiple times. I.e. after being closed, it is an error for open, next or hasNext to be
* called, but close can be called any number of times.
*
* @param cascade whether to call close on child operators.
*/
void close(boolean cascade);
/**
* Used to indicate that no more data will ever come
*/
void completed();
}
}

View File

@ -22,12 +22,21 @@ package org.apache.druid.query.operator;
import org.apache.druid.java.util.common.guava.Accumulator;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Yielder;
import org.apache.druid.java.util.common.guava.Yielders;
import org.apache.druid.java.util.common.guava.YieldingAccumulator;
import org.apache.druid.query.rowsandcols.RowsAndColumns;
import java.util.function.Supplier;
/**
* Provides a sequence on top of Operators. The mis-match in pull (Sequence) and push (Operator) means that, if we
* choose to support the Yielder interface, we have to use threading. Managing extra threads in order to do that
* is unfortunate, so, we choose to take a bit of a cop-out approach.
*
* Specifically, the accumulate method doesn't actually have the same problem and the query pipeline after the merge
* functions is composed of Sequences that all use accumulate instead of yielder. Thus, if we are certain that
* we only use the OperatorSequence in places where toYielder is not called (i.e. it's only used as the return
* value of the merge() calls), then we can get away with only implementing the accumulate path.
*/
public class OperatorSequence implements Sequence<RowsAndColumns>
{
private final Supplier<Operator> opSupplier;
@ -41,24 +50,13 @@ public class OperatorSequence implements Sequence<RowsAndColumns>
@Override
public <OutType> OutType accumulate(
OutType initValue,
final OutType initValue,
Accumulator<OutType, RowsAndColumns> accumulator
)
{
Operator op = null;
try {
op = opSupplier.get();
op.open();
while (op.hasNext()) {
initValue = accumulator.accumulate(initValue, op.next());
}
return initValue;
}
finally {
if (op != null) {
op.close(true);
}
}
final MyReceiver<OutType> receiver = new MyReceiver<>(initValue, accumulator);
opSupplier.get().go(receiver);
return receiver.getRetVal();
}
@Override
@ -67,59 +65,38 @@ public class OperatorSequence implements Sequence<RowsAndColumns>
YieldingAccumulator<OutType, RowsAndColumns> accumulator
)
{
final Operator op = opSupplier.get();
try {
op.open();
// As mentioned in the class-level javadoc, we skip this implementation and leave it up to the developer to
// only use this class in "safe" locations.
throw new UnsupportedOperationException("Cannot convert an Operator to a Yielder");
}
while (!accumulator.yielded() && op.hasNext()) {
initValue = accumulator.accumulate(initValue, op.next());
}
if (accumulator.yielded()) {
OutType finalInitValue = initValue;
return new Yielder<OutType>()
{
private OutType retVal = finalInitValue;
private boolean done = false;
private static class MyReceiver<OutType> implements Operator.Receiver
{
private final Accumulator<OutType, RowsAndColumns> accumulator;
private OutType retVal;
@Override
public OutType get()
{
return retVal;
}
@Override
public Yielder<OutType> next(OutType initValue)
{
accumulator.reset();
retVal = initValue;
while (!accumulator.yielded() && op.hasNext()) {
retVal = accumulator.accumulate(retVal, op.next());
}
if (!accumulator.yielded()) {
done = true;
}
return this;
}
@Override
public boolean isDone()
{
return done;
}
@Override
public void close()
{
op.close(true);
}
};
} else {
return Yielders.done(initValue, () -> op.close(true));
}
public MyReceiver(OutType initValue, Accumulator<OutType, RowsAndColumns> accumulator)
{
this.accumulator = accumulator;
retVal = initValue;
}
catch (RuntimeException e) {
op.close(true);
throw e;
public OutType getRetVal()
{
return retVal;
}
@Override
public boolean push(RowsAndColumns rac)
{
retVal = accumulator.accumulate(retVal, rac);
return true;
}
@Override
public void completed()
{
}
}
}

View File

@ -20,13 +20,16 @@
package org.apache.druid.query.operator;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.RE;
import org.apache.druid.query.rowsandcols.RowsAndColumns;
import org.apache.druid.segment.CloseableShapeshifter;
import org.apache.druid.segment.Segment;
import java.io.IOException;
public class SegmentToRowsAndColumnsOperator implements Operator
{
private final Segment segment;
private boolean hasNext = true;
public SegmentToRowsAndColumnsOperator(
Segment segment
@ -36,33 +39,22 @@ public class SegmentToRowsAndColumnsOperator implements Operator
}
@Override
public void open()
public void go(Receiver receiver)
{
try (final CloseableShapeshifter shifty = segment.as(CloseableShapeshifter.class)) {
if (shifty == null) {
throw new ISE("Segment[%s] cannot shapeshift", segment.getClass());
}
}
@Override
public RowsAndColumns next()
{
hasNext = false;
RowsAndColumns rac = segment.as(RowsAndColumns.class);
if (rac != null) {
return rac;
RowsAndColumns rac = shifty.as(RowsAndColumns.class);
if (rac == null) {
throw new ISE("Cannot work with segment of type[%s]", segment.getClass());
}
receiver.push(rac);
receiver.completed();
}
catch (IOException e) {
throw new RE(e, "Problem closing resources for segment[%s]", segment.getId());
}
throw new ISE("Cannot work with segment of type[%s]", segment.getClass());
}
@Override
public boolean hasNext()
{
return hasNext;
}
@Override
public void close(boolean cascade)
{
}
}

View File

@ -19,21 +19,12 @@
package org.apache.druid.query.operator;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.RE;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Yielder;
import org.apache.druid.java.util.common.guava.Yielders;
import org.apache.druid.query.rowsandcols.RowsAndColumns;
import java.io.IOException;
import java.util.NoSuchElementException;
public class SequenceOperator implements Operator
{
private final Sequence<RowsAndColumns> child;
private Yielder<RowsAndColumns> yielder;
private boolean closed = false;
public SequenceOperator(
Sequence<RowsAndColumns> child
@ -43,45 +34,15 @@ public class SequenceOperator implements Operator
}
@Override
public void open()
public void go(Receiver receiver)
{
if (closed) {
throw new ISE("Operator closed, cannot be re-opened");
}
yielder = Yielders.each(child);
}
@Override
public RowsAndColumns next()
{
if (closed) {
throw new NoSuchElementException();
}
final RowsAndColumns retVal = yielder.get();
yielder = yielder.next(null);
return retVal;
}
@Override
public boolean hasNext()
{
return !closed && !yielder.isDone();
}
@Override
public void close(boolean cascade)
{
if (closed) {
return;
}
try {
yielder.close();
}
catch (IOException e) {
throw new RE(e, "Exception when closing yielder from Sequence");
}
finally {
closed = true;
}
child.accumulate(
null,
(accumulated, in) -> {
receiver.push(in);
return accumulated;
}
);
receiver.completed();
}
}

View File

@ -40,28 +40,21 @@ public class WindowProcessorOperator implements Operator
}
@Override
public void open()
public void go(Receiver receiver)
{
child.open();
}
child.go(new Receiver()
{
@Override
public boolean push(RowsAndColumns rac)
{
return receiver.push(windowProcessor.process(rac));
}
@Override
public RowsAndColumns next()
{
return windowProcessor.process(child.next());
}
@Override
public boolean hasNext()
{
return child.hasNext();
}
@Override
public void close(boolean cascade)
{
if (cascade) {
child.close(cascade);
}
@Override
public void completed()
{
receiver.completed();
}
});
}
}

View File

@ -106,8 +106,8 @@ public class ArrayListSegment<RowType> implements Segment
@SuppressWarnings("unchecked")
public <T> T as(Class<T> clazz)
{
if (RowsAndColumns.class.equals(clazz)) {
return (T) asRowsAndColumns();
if (CloseableShapeshifter.class.equals(clazz)) {
return (T) new MyCloseableShapeshifter();
}
return null;
}
@ -120,7 +120,27 @@ public class ArrayListSegment<RowType> implements Segment
private RowsAndColumns asRowsAndColumns()
{
return new ArrayListRowsAndColumns(rows, rowAdapter, rowSignature);
return new ArrayListRowsAndColumns<>(rows, rowAdapter, rowSignature);
}
private class MyCloseableShapeshifter implements CloseableShapeshifter
{
@Override
public void close()
{
}
@Nullable
@Override
@SuppressWarnings("unchecked")
public <T> T as(Class<T> clazz)
{
if (RowsAndColumns.class.equals(clazz)) {
return (T) asRowsAndColumns();
}
return null;
}
}
}

View File

@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.segment;
import javax.annotation.Nullable;
import java.io.Closeable;
/**
* A CloseableShapeshifter is an interface created to allow Segments to be used from {@link #as(Class)}, but also to
* be able to ensure that any resource used by the object returned from the {@link #as(Class)} method have proper
* management of their lifecycle. This was initially introduced in order to make it possible for {@link Segment} to
* become a {@link org.apache.druid.query.rowsandcols.RowsAndColumns} without needing to add extra close() methods to
* {@link org.apache.druid.query.rowsandcols.RowsAndColumns}.
*/
public interface CloseableShapeshifter extends Closeable
{
/**
* Asks the Object to return itself as a concrete implementation of a specific interface. The interface
* asked for will tend to be a semantically-meaningful interface.
*
* @param clazz A class object representing the interface that the calling code wants a concrete implementation of
* @param <T> The interface that the calling code wants a concrete implementation of
* @return A concrete implementation of the interface, or null if there is no meaningful optimization to be had
* through a local implementation of the interface.
*/
@Nullable
<T> T as(Class<T> clazz);
}

View File

@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.operator;
import org.apache.druid.query.rowsandcols.RowsAndColumns;
public class ExceptionalReceiver implements Operator.Receiver
{
@Override
public boolean push(RowsAndColumns rac)
{
throw new UnsupportedOperationException();
}
@Override
public void completed()
{
throw new UnsupportedOperationException();
}
}

View File

@ -49,25 +49,12 @@ public class InlineScanOperator implements Operator
}
@Override
public void open()
public void go(Receiver receiver)
{
}
@Override
public RowsAndColumns next()
{
return iter.next();
}
@Override
public boolean hasNext()
{
return iter.hasNext();
}
@Override
public void close(boolean cascade)
{
iter = null;
boolean keepItGoing = true;
while (keepItGoing && iter.hasNext()) {
keepItGoing = receiver.push(iter.next());
}
receiver.completed();
}
}

View File

@ -29,9 +29,6 @@ import org.apache.druid.query.rowsandcols.column.IntArrayColumn;
import org.junit.Assert;
import org.junit.Test;
import java.util.Arrays;
import java.util.List;
public class NaivePartitioningOperatorTest
{
@Test
@ -49,30 +46,49 @@ public class NaivePartitioningOperatorTest
InlineScanOperator.make(rac)
);
op.open();
new OperatorTestHelper()
.expectRowsAndColumns(
new RowsAndColumnsHelper()
.expectColumn("sorted", new int[]{0, 0, 0})
.expectColumn("unsorted", new int[]{3, 54, 21}),
new RowsAndColumnsHelper()
.expectColumn("sorted", new int[]{1, 1})
.expectColumn("unsorted", new int[]{1, 5}),
new RowsAndColumnsHelper()
.expectColumn("sorted", new int[]{2})
.expectColumn("unsorted", new int[]{54}),
new RowsAndColumnsHelper()
.expectColumn("sorted", new int[]{4, 4, 4})
.expectColumn("unsorted", new int[]{2, 3, 92})
)
.runToCompletion(op);
}
List<RowsAndColumnsHelper> expectations = Arrays.asList(
new RowsAndColumnsHelper()
.expectColumn("sorted", new int[]{0, 0, 0})
.expectColumn("unsorted", new int[]{3, 54, 21}),
new RowsAndColumnsHelper()
.expectColumn("sorted", new int[]{1, 1})
.expectColumn("unsorted", new int[]{1, 5}),
new RowsAndColumnsHelper()
.expectColumn("sorted", new int[]{2})
.expectColumn("unsorted", new int[]{54}),
new RowsAndColumnsHelper()
.expectColumn("sorted", new int[]{4, 4, 4})
.expectColumn("unsorted", new int[]{2, 3, 92})
@Test
public void testStopMidStream()
{
RowsAndColumns rac = MapOfColumnsRowsAndColumns.fromMap(
ImmutableMap.of(
"sorted", new IntArrayColumn(new int[]{0, 0, 0, 1, 1, 2, 4, 4, 4}),
"unsorted", new IntArrayColumn(new int[]{3, 54, 21, 1, 5, 54, 2, 3, 92})
)
);
for (RowsAndColumnsHelper expectation : expectations) {
Assert.assertTrue(op.hasNext());
expectation.validate(op.next());
}
Assert.assertFalse(op.hasNext());
NaivePartitioningOperator op = new NaivePartitioningOperator(
ImmutableList.of("sorted"),
InlineScanOperator.make(rac)
);
op.close(true);
new OperatorTestHelper()
.expectAndStopAfter(
new RowsAndColumnsHelper()
.expectColumn("sorted", new int[]{0, 0, 0})
.expectColumn("unsorted", new int[]{3, 54, 21}),
new RowsAndColumnsHelper()
.expectColumn("sorted", new int[]{1, 1})
.expectColumn("unsorted", new int[]{1, 5})
)
.runToCompletion(op);
}
@Test
@ -90,11 +106,17 @@ public class NaivePartitioningOperatorTest
InlineScanOperator.make(rac)
);
op.open();
boolean exceptionThrown = false;
try {
op.next();
new OperatorTestHelper()
.withPushFn(
rac1 -> {
Assert.fail("I shouldn't be called, an exception should've been thrown.");
return true;
}
)
.runToCompletion(op);
}
catch (ISE ex) {
Assert.assertEquals("Pre-sorted data required, rows[1] and [2] were not in order", ex.getMessage());

View File

@ -21,6 +21,7 @@ package org.apache.druid.query.operator;
import org.apache.druid.java.util.common.guava.Yielder;
import org.apache.druid.java.util.common.guava.YieldingAccumulator;
import org.apache.druid.query.operator.window.RowsAndColumnsHelper;
import org.apache.druid.query.rowsandcols.MapOfColumnsRowsAndColumns;
import org.apache.druid.query.rowsandcols.RowsAndColumns;
import org.apache.druid.query.rowsandcols.column.IntArrayColumn;
@ -30,27 +31,54 @@ import org.junit.Test;
public class OperatorSequenceTest
{
@Test
public void testSanity()
public void testAccumulateButNoYielder()
{
OperatorSequence seq = new OperatorSequence(
() -> InlineScanOperator.make(MapOfColumnsRowsAndColumns.of("hi", new IntArrayColumn(new int[]{1})))
);
Assert.assertEquals(1, seq.accumulate(0, (accumulated, in) -> accumulated + 1).intValue());
final RowsAndColumnsHelper helper = new RowsAndColumnsHelper()
.expectColumn("hi", new int[]{1})
.allColumnsRegistered();
Yielder<Integer> yielder = seq.toYielder(0, new YieldingAccumulator<Integer, RowsAndColumns>()
{
@Override
public Integer accumulate(Integer accumulated, RowsAndColumns in)
Assert.assertEquals(
1,
seq.accumulate(
0,
(accumulated, in) -> {
helper.validate(in);
return accumulated + 1;
}
).intValue()
);
boolean exceptionThrown = false;
try {
Yielder<Integer> yielder = seq.toYielder(0, new YieldingAccumulator<Integer, RowsAndColumns>()
{
yield();
return accumulated + 1;
}
});
Assert.assertFalse(yielder.isDone());
Assert.assertEquals(1, yielder.get().intValue());
@Override
public Integer accumulate(Integer accumulated, RowsAndColumns in)
{
Assert.fail("This should never be called, because we expect a UOE before this point");
this.yield();
helper.validate(in);
return accumulated + 1;
}
});
yielder = yielder.next(0);
Assert.assertTrue(yielder.isDone());
// The exception will have been thrown before this point, in which case one might wonder why the code here
// remains. It is because this code is a correct validation of what should happen if OperatorSequence *did*
// implement the Yielder. It's kept for posterity in case we ever choose to implement it using threads.
Assert.assertFalse(yielder.isDone());
Assert.assertEquals(1, yielder.get().intValue());
yielder = yielder.next(0);
Assert.assertTrue(yielder.isDone());
}
catch (UnsupportedOperationException ex) {
Assert.assertEquals("Cannot convert an Operator to a Yielder", ex.getMessage());
exceptionThrown = true;
}
Assert.assertTrue(exceptionThrown);
}
}

View File

@ -0,0 +1,156 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.operator;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.query.operator.window.RowsAndColumnsHelper;
import org.apache.druid.query.rowsandcols.RowsAndColumns;
import org.junit.Assert;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.function.Supplier;
public class OperatorTestHelper
{
private Supplier<TestReceiver> receiverSupply;
private Consumer<TestReceiver> finalValidation;
public OperatorTestHelper expectRowsAndColumns(RowsAndColumnsHelper... helpers)
{
return withPushFn(
new JustPushMe()
{
int index = 0;
@Override
public boolean push(RowsAndColumns rac)
{
helpers[index++].validate(rac);
return true;
}
}
).withFinalValidation(
testReceiver -> Assert.assertEquals(helpers.length, testReceiver.getNumPushed())
);
}
public OperatorTestHelper expectAndStopAfter(RowsAndColumnsHelper... helpers)
{
return withPushFn(
new JustPushMe()
{
int index = 0;
@Override
public boolean push(RowsAndColumns rac)
{
helpers[index++].validate(rac);
return index < helpers.length;
}
}
).withFinalValidation(
testReceiver -> Assert.assertEquals(helpers.length, testReceiver.getNumPushed())
);
}
public OperatorTestHelper withReceiver(Supplier<TestReceiver> receiver)
{
if (this.receiverSupply != null) {
throw new ISE("Receiver[%s] already set, cannot set it again[%s].", this.receiverSupply, receiver);
}
this.receiverSupply = receiver;
return this;
}
public OperatorTestHelper withFinalValidation(Consumer<TestReceiver> validator)
{
if (finalValidation == null) {
this.finalValidation = validator;
} else {
final Consumer<TestReceiver> subValidator = finalValidation;
this.finalValidation = (receiver) -> {
subValidator.accept(receiver);
validator.accept(receiver);
};
}
return this;
}
public OperatorTestHelper withPushFn(JustPushMe fn)
{
return withReceiver(() -> new TestReceiver(fn));
}
public OperatorTestHelper runToCompletion(Operator op)
{
TestReceiver receiver = this.receiverSupply.get();
op.go(receiver);
Assert.assertTrue(receiver.isCompleted());
if (finalValidation != null) {
finalValidation.accept(receiver);
}
return this;
}
public interface JustPushMe
{
boolean push(RowsAndColumns rac);
}
public static class TestReceiver implements Operator.Receiver
{
private final JustPushMe pushFn;
private AtomicLong numPushed = new AtomicLong();
private AtomicBoolean completed = new AtomicBoolean(false);
public TestReceiver(JustPushMe pushFn)
{
this.pushFn = pushFn;
}
@Override
public boolean push(RowsAndColumns rac)
{
numPushed.incrementAndGet();
return pushFn.push(rac);
}
public boolean isCompleted()
{
return completed.get();
}
@Override
public void completed()
{
if (!completed.compareAndSet(false, true)) {
throw new ISE("complete called more than once!? Why.");
}
}
public long getNumPushed()
{
return numPushed.get();
}
}
}

View File

@ -0,0 +1,187 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.operator;
import com.google.common.collect.Lists;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.RE;
import org.apache.druid.query.operator.window.RowsAndColumnsHelper;
import org.apache.druid.query.rowsandcols.MapOfColumnsRowsAndColumns;
import org.apache.druid.query.rowsandcols.RowsAndColumns;
import org.apache.druid.query.rowsandcols.column.IntArrayColumn;
import org.apache.druid.segment.ArrayListSegment;
import org.apache.druid.segment.CloseableShapeshifter;
import org.apache.druid.segment.TestSegmentForAs;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.timeline.SegmentId;
import org.junit.Assert;
import org.junit.Test;
import javax.annotation.Nullable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
public class SegmentToRowsAndColumnsOperatorTest
{
@Test
public void testSanity()
{
ArrayList<Object[]> rows = Lists.newArrayList(
new Object[]{1, 2, "a"},
new Object[]{1, 2, "b"}
);
ArrayListSegment<Object[]> segment = new ArrayListSegment<>(
SegmentId.dummy("test"),
rows,
columnName -> objects -> objects[Integer.parseInt(columnName)],
RowSignature.builder()
.add("0", ColumnType.LONG)
.add("1", ColumnType.DOUBLE)
.add("2", ColumnType.STRING).build()
);
final SegmentToRowsAndColumnsOperator op = new SegmentToRowsAndColumnsOperator(segment);
new OperatorTestHelper()
.expectRowsAndColumns(
new RowsAndColumnsHelper()
.expectColumn("0", new long[]{1, 1})
.expectColumn("1", new double[]{2, 2})
.expectColumn("2", ColumnType.STRING, "a", "b")
.allColumnsRegistered()
)
.runToCompletion(op);
}
@Test
public void testNotShapeshiftable()
{
SegmentToRowsAndColumnsOperator op = new SegmentToRowsAndColumnsOperator(
new TestSegmentForAs(SegmentId.dummy("test"), aClass -> {
Assert.assertEquals(CloseableShapeshifter.class, aClass);
return null;
})
);
boolean exceptionThrown = false;
try {
op.go(new ExceptionalReceiver());
}
catch (ISE e) {
Assert.assertEquals(e.getMessage(), "Segment[class org.apache.druid.segment.TestSegmentForAs] cannot shapeshift");
exceptionThrown = true;
}
Assert.assertTrue(exceptionThrown);
}
@Test
public void testCanShiftButNotToARAC()
{
AtomicBoolean closed = new AtomicBoolean(false);
SegmentToRowsAndColumnsOperator op = new SegmentToRowsAndColumnsOperator(
new TestSegmentForAs(SegmentId.dummy("test"), aClass -> {
Assert.assertEquals(CloseableShapeshifter.class, aClass);
return new CloseableShapeshifter()
{
@Nullable
@Override
public <T> T as(Class<T> clazz)
{
Assert.assertEquals(RowsAndColumns.class, clazz);
return null;
}
@Override
public void close()
{
closed.set(true);
}
};
})
);
boolean exceptionThrown = false;
try {
op.go(new ExceptionalReceiver());
}
catch (ISE e) {
Assert.assertEquals(
e.getMessage(),
"Cannot work with segment of type[class org.apache.druid.segment.TestSegmentForAs]"
);
exceptionThrown = true;
}
Assert.assertTrue(exceptionThrown);
Assert.assertTrue(closed.get());
}
@Test
public void testExceptionWhileClosing()
{
final MapOfColumnsRowsAndColumns expectedRac =
MapOfColumnsRowsAndColumns.of("0", new IntArrayColumn(new int[]{0, 1}));
AtomicBoolean closed = new AtomicBoolean(false);
SegmentToRowsAndColumnsOperator op = new SegmentToRowsAndColumnsOperator(
new TestSegmentForAs(SegmentId.dummy("test"), aClass -> {
Assert.assertEquals(CloseableShapeshifter.class, aClass);
return new CloseableShapeshifter()
{
@SuppressWarnings("unchecked")
@Nullable
@Override
public <T> T as(Class<T> clazz)
{
Assert.assertEquals(RowsAndColumns.class, clazz);
return (T) expectedRac;
}
@Override
public void close() throws IOException
{
closed.set(true);
throw new IOException("ain't no thang");
}
};
})
);
boolean exceptionThrown = false;
try {
new OperatorTestHelper()
.withPushFn(rac -> {
Assert.assertSame(expectedRac, rac);
return true;
})
.runToCompletion(op);
}
catch (RE e) {
Assert.assertEquals(
e.getMessage(),
"Problem closing resources for segment[test_-146136543-09-08T08:23:32.096Z_146140482-04-24T15:36:27.903Z_dummy_version]"
);
exceptionThrown = true;
}
Assert.assertTrue(exceptionThrown);
Assert.assertTrue(closed.get());
}
}

View File

@ -38,19 +38,17 @@ public class SequenceOperatorTest
MapOfColumnsRowsAndColumns.of("hi", new IntArrayColumn(new int[]{1}))
)));
op.open();
RowsAndColumnsHelper expectations = new RowsAndColumnsHelper()
.expectColumn("hi", new int[]{1})
.allColumnsRegistered();
expectations.validate(op.next());
Assert.assertTrue(op.hasNext());
expectations.validate(op.next());
Assert.assertFalse(op.hasNext());
op.close(true);
op.close(false);
new OperatorTestHelper()
.withPushFn(
rac -> {
new RowsAndColumnsHelper()
.expectColumn("hi", new int[]{1})
.allColumnsRegistered()
.validate(rac);
return true;
}
)
.withFinalValidation(testReceiver -> Assert.assertEquals(2, testReceiver.getNumPushed()))
.runToCompletion(op);
}
}

View File

@ -57,10 +57,13 @@ public class WindowProcessorOperatorTest
InlineScanOperator.make(rac)
);
op.open();
Assert.assertTrue(op.hasNext());
Assert.assertSame(rac, op.next());
Assert.assertFalse(op.hasNext());
op.close(true);
new OperatorTestHelper()
.withPushFn(
rowsAndColumns -> {
Assert.assertSame(rac, rowsAndColumns);
return true;
}
)
.runToCompletion(op);
}
}

View File

@ -96,6 +96,13 @@ public class RowsAndColumnsHelper
return this;
}
public RowsAndColumnsHelper expectColumn(String col, ColumnType type, Object... expectedVals)
{
final ColumnHelper helper = columnHelper(col, expectedVals.length, type);
helper.setExpectation(expectedVals);
return this;
}
public RowsAndColumnsHelper expectColumn(String col, Object[] expectedVals, ColumnType type)
{
final ColumnHelper helper = columnHelper(col, expectedVals.length, type);

View File

@ -33,8 +33,6 @@ import org.apache.druid.query.rowsandcols.semantic.FramedOnHeapAggregatable;
import org.junit.Assert;
import org.junit.Test;
import javax.annotation.Nullable;
@SuppressWarnings("unchecked")
public class WindowFramedAggregateProcessorTest
{
@ -52,9 +50,8 @@ public class WindowFramedAggregateProcessorTest
"yay", new IntArrayColumn(new int[]{1, 2, 3})
));
final RowsAndColumns processed = proc.process(new AsOnlyTestRowsAndColumns(theFrame, theAggs, rac)
final RowsAndColumns processed = proc.process(new AsOnlyTestRowsAndColumns()
{
@Nullable
@Override
public <T> T as(Class<T> clazz)
{

View File

@ -20,29 +20,12 @@
package org.apache.druid.query.rowsandcols;
import org.apache.druid.java.util.common.UOE;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.operator.window.WindowFrame;
import org.apache.druid.query.rowsandcols.column.Column;
import java.util.Collection;
public abstract class AsOnlyTestRowsAndColumns implements RowsAndColumns
{
private final WindowFrame theFrame;
private final AggregatorFactory[] theAggs;
private final MapOfColumnsRowsAndColumns rac;
public AsOnlyTestRowsAndColumns(
WindowFrame theFrame,
AggregatorFactory[] theAggs,
MapOfColumnsRowsAndColumns rac
)
{
this.theFrame = theFrame;
this.theAggs = theAggs;
this.rac = rac;
}
@Override
public Collection<String> getColumnNames()
{

View File

@ -0,0 +1,82 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.segment;
import org.apache.druid.timeline.SegmentId;
import org.joda.time.Interval;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.util.function.Function;
@SuppressWarnings("rawtypes")
public class TestSegmentForAs implements Segment
{
private final SegmentId id;
private final Function<Class, Object> asFn;
public TestSegmentForAs(
SegmentId id,
Function<Class, Object> asFn
)
{
this.id = id;
this.asFn = asFn;
}
@Override
public SegmentId getId()
{
return id;
}
@Override
public Interval getDataInterval()
{
return id.getInterval();
}
@Nullable
@Override
public QueryableIndex asQueryableIndex()
{
return as(QueryableIndex.class);
}
@Override
public StorageAdapter asStorageAdapter()
{
return as(StorageAdapter.class);
}
@SuppressWarnings("unchecked")
@Nullable
@Override
public <T> T as(@Nonnull Class<T> clazz)
{
return (T) asFn.apply(clazz);
}
@Override
public void close()
{
}
}