SQL: OOify field collection and depth calculation (elastic/x-pack-elasticsearch#3465)
Converts the collection of fields and the calculation of depth for `ProcessorDefinition`s to OO style tree traversal without the need for `Node` or `instanceof` tests. Original commit: elastic/x-pack-elasticsearch@5d0517af29
This commit is contained in:
parent
530c0f3dcb
commit
260e5ae6cf
|
@ -24,4 +24,13 @@ public interface FieldExtraction {
|
|||
* ({@code true}) or should it force a scroll query ({@code false})?
|
||||
*/
|
||||
boolean supportedByAggsOnlyQuery();
|
||||
|
||||
/**
|
||||
* Indicates the depth of the result. Used for counting the actual size of a
|
||||
* result by knowing how many nested levels there are. Typically used by
|
||||
* aggregations.
|
||||
*
|
||||
* @return depth of the result
|
||||
*/
|
||||
int depth();
|
||||
}
|
||||
|
|
|
@ -7,8 +7,7 @@ package org.elasticsearch.xpack.sql.expression.function.scalar.processor.definit
|
|||
|
||||
import org.elasticsearch.xpack.sql.expression.Expression;
|
||||
|
||||
public class AggNameInput extends NonExecutableInput<String> {
|
||||
|
||||
public class AggNameInput extends CommonNonExecutableInput<String> {
|
||||
public AggNameInput(Expression expression, String context) {
|
||||
super(expression, context);
|
||||
}
|
||||
|
@ -19,7 +18,7 @@ public class AggNameInput extends NonExecutableInput<String> {
|
|||
}
|
||||
|
||||
@Override
|
||||
public final ProcessorDefinition resolveAttributes(AttributeResolver resolver) {
|
||||
return this;
|
||||
public final boolean resolved() {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -5,12 +5,13 @@
|
|||
*/
|
||||
package org.elasticsearch.xpack.sql.expression.function.scalar.processor.definition;
|
||||
|
||||
import org.elasticsearch.xpack.sql.execution.search.SqlSourceBuilder;
|
||||
import org.elasticsearch.xpack.sql.expression.Expression;
|
||||
import org.elasticsearch.xpack.sql.expression.function.scalar.processor.runtime.Processor;
|
||||
|
||||
import java.util.Objects;
|
||||
|
||||
public class AggPathInput extends NonExecutableInput<String> {
|
||||
public class AggPathInput extends CommonNonExecutableInput<String> {
|
||||
|
||||
private final String innerKey;
|
||||
// used in case the agg itself is not returned in a suitable format (like date aggs)
|
||||
|
@ -48,11 +49,6 @@ public class AggPathInput extends NonExecutableInput<String> {
|
|||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ProcessorDefinition resolveAttributes(AttributeResolver resolver) {
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(context(), innerKey);
|
||||
|
|
|
@ -5,6 +5,7 @@
|
|||
*/
|
||||
package org.elasticsearch.xpack.sql.expression.function.scalar.processor.definition;
|
||||
|
||||
import org.elasticsearch.xpack.sql.execution.search.SqlSourceBuilder;
|
||||
import org.elasticsearch.xpack.sql.expression.Expression;
|
||||
import org.elasticsearch.xpack.sql.expression.function.scalar.processor.runtime.MatrixFieldProcessor;
|
||||
import org.elasticsearch.xpack.sql.expression.function.scalar.processor.runtime.Processor;
|
||||
|
@ -43,6 +44,16 @@ public class AggValueInput extends LeafInput<Supplier<Object>> {
|
|||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public final void collectFields(SqlSourceBuilder sourceBuilder) {
|
||||
// Nothing to collect
|
||||
}
|
||||
|
||||
@Override
|
||||
public final int depth() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(context(), innerKey);
|
||||
|
|
|
@ -5,6 +5,7 @@
|
|||
*/
|
||||
package org.elasticsearch.xpack.sql.expression.function.scalar.processor.definition;
|
||||
|
||||
import org.elasticsearch.xpack.sql.execution.search.SqlSourceBuilder;
|
||||
import org.elasticsearch.xpack.sql.expression.Attribute;
|
||||
import org.elasticsearch.xpack.sql.expression.Expression;
|
||||
|
||||
|
@ -26,4 +27,14 @@ public class AttributeInput extends NonExecutableInput<Attribute> {
|
|||
public ProcessorDefinition resolveAttributes(AttributeResolver resolver) {
|
||||
return new ReferenceInput(expression(), resolver.resolve(context()));
|
||||
}
|
||||
|
||||
@Override
|
||||
public final void collectFields(SqlSourceBuilder sourceBuilder) {
|
||||
// Nothing to extract
|
||||
}
|
||||
|
||||
@Override
|
||||
public final int depth() {
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -5,6 +5,7 @@
|
|||
*/
|
||||
package org.elasticsearch.xpack.sql.expression.function.scalar.processor.definition;
|
||||
|
||||
import org.elasticsearch.xpack.sql.execution.search.SqlSourceBuilder;
|
||||
import org.elasticsearch.xpack.sql.expression.Expression;
|
||||
|
||||
import java.util.Arrays;
|
||||
|
@ -42,6 +43,11 @@ public abstract class BinaryProcessorDefinition extends ProcessorDefinition {
|
|||
return replaceChildren(newLeft, newRight);
|
||||
}
|
||||
|
||||
@Override
|
||||
public final int depth() {
|
||||
return Math.max(left.depth(), right.depth());
|
||||
}
|
||||
|
||||
/**
|
||||
* Build a copy of this object with new left and right children. Used by
|
||||
* {@link #resolveAttributes(AttributeResolver)}.
|
||||
|
@ -52,4 +58,10 @@ public abstract class BinaryProcessorDefinition extends ProcessorDefinition {
|
|||
public boolean resolved() {
|
||||
return left().resolved() && right().resolved();
|
||||
}
|
||||
|
||||
@Override
|
||||
public final void collectFields(SqlSourceBuilder sourceBuilder) {
|
||||
left.collectFields(sourceBuilder);
|
||||
right.collectFields(sourceBuilder);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,41 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.sql.expression.function.scalar.processor.definition;
|
||||
|
||||
import org.elasticsearch.xpack.sql.SqlIllegalArgumentException;
|
||||
import org.elasticsearch.xpack.sql.execution.search.SqlSourceBuilder;
|
||||
import org.elasticsearch.xpack.sql.expression.Expression;
|
||||
import org.elasticsearch.xpack.sql.expression.function.scalar.processor.runtime.Processor;
|
||||
|
||||
/**
|
||||
* Implementation common to most subclasses of
|
||||
* {@link NonExecutableInput} but not shared by all.
|
||||
*/
|
||||
abstract class CommonNonExecutableInput<T> extends NonExecutableInput<T> {
|
||||
CommonNonExecutableInput(Expression expression, T context) {
|
||||
super(expression, context);
|
||||
}
|
||||
|
||||
@Override
|
||||
public final Processor asProcessor() {
|
||||
throw new SqlIllegalArgumentException("Unresolved input - needs resolving first");
|
||||
}
|
||||
|
||||
@Override
|
||||
public final ProcessorDefinition resolveAttributes(AttributeResolver resolver) {
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public final void collectFields(SqlSourceBuilder sourceBuilder) {
|
||||
// Nothing to extract
|
||||
}
|
||||
|
||||
@Override
|
||||
public final int depth() {
|
||||
return 0;
|
||||
}
|
||||
}
|
|
@ -5,6 +5,7 @@
|
|||
*/
|
||||
package org.elasticsearch.xpack.sql.expression.function.scalar.processor.definition;
|
||||
|
||||
import org.elasticsearch.xpack.sql.execution.search.SqlSourceBuilder;
|
||||
import org.elasticsearch.xpack.sql.expression.Expression;
|
||||
import org.elasticsearch.xpack.sql.expression.function.scalar.processor.runtime.ConstantProcessor;
|
||||
import org.elasticsearch.xpack.sql.expression.function.scalar.processor.runtime.Processor;
|
||||
|
@ -29,4 +30,14 @@ public class ConstantInput extends LeafInput<Object> {
|
|||
public ProcessorDefinition resolveAttributes(AttributeResolver resolver) {
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public final void collectFields(SqlSourceBuilder sourceBuilder) {
|
||||
// Nothing to collect
|
||||
}
|
||||
|
||||
@Override
|
||||
public final int depth() {
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -5,6 +5,7 @@
|
|||
*/
|
||||
package org.elasticsearch.xpack.sql.expression.function.scalar.processor.definition;
|
||||
|
||||
import org.elasticsearch.xpack.sql.execution.search.SqlSourceBuilder;
|
||||
import org.elasticsearch.xpack.sql.execution.search.extractor.HitExtractor;
|
||||
import org.elasticsearch.xpack.sql.expression.Expression;
|
||||
import org.elasticsearch.xpack.sql.expression.function.scalar.processor.runtime.HitExtractorProcessor;
|
||||
|
@ -30,4 +31,14 @@ public class HitExtractorInput extends LeafInput<HitExtractor> {
|
|||
public ProcessorDefinition resolveAttributes(AttributeResolver resolver) {
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public final void collectFields(SqlSourceBuilder sourceBuilder) {
|
||||
// No fields to collect
|
||||
}
|
||||
|
||||
@Override
|
||||
public final int depth() {
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -6,7 +6,6 @@
|
|||
package org.elasticsearch.xpack.sql.expression.function.scalar.processor.definition;
|
||||
|
||||
import org.elasticsearch.xpack.sql.execution.search.FieldExtraction;
|
||||
import org.elasticsearch.xpack.sql.execution.search.SqlSourceBuilder;
|
||||
import org.elasticsearch.xpack.sql.expression.Attribute;
|
||||
import org.elasticsearch.xpack.sql.expression.Expression;
|
||||
import org.elasticsearch.xpack.sql.expression.function.scalar.processor.runtime.Processor;
|
||||
|
@ -32,11 +31,6 @@ public abstract class ProcessorDefinition extends Node<ProcessorDefinition> impl
|
|||
|
||||
public abstract Processor asProcessor();
|
||||
|
||||
@Override
|
||||
public void collectFields(SqlSourceBuilder sourceBuilder) {
|
||||
// No fields needed
|
||||
}
|
||||
|
||||
/**
|
||||
* Resolve {@link Attribute}s which are unprocessable into
|
||||
* {@link ColumnReference}s which are processable.
|
||||
|
|
|
@ -5,6 +5,7 @@
|
|||
*/
|
||||
package org.elasticsearch.xpack.sql.expression.function.scalar.processor.definition;
|
||||
|
||||
import org.elasticsearch.xpack.sql.execution.search.SqlSourceBuilder;
|
||||
import org.elasticsearch.xpack.sql.expression.Expression;
|
||||
import org.elasticsearch.xpack.sql.querydsl.container.ColumnReference;
|
||||
|
||||
|
@ -22,4 +23,14 @@ public class ReferenceInput extends NonExecutableInput<ColumnReference> {
|
|||
public ProcessorDefinition resolveAttributes(AttributeResolver resolver) {
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public final void collectFields(SqlSourceBuilder sourceBuilder) {
|
||||
context().collectFields(sourceBuilder);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int depth() {
|
||||
return context().depth();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -29,11 +29,6 @@ public class ScoreProcessorDefinition extends ProcessorDefinition {
|
|||
return new HitExtractorProcessor(ScoreExtractor.INSTANCE);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void collectFields(SqlSourceBuilder sourceBuilder) {
|
||||
sourceBuilder.trackScores();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean supportedByAggsOnlyQuery() {
|
||||
return false;
|
||||
|
@ -43,4 +38,14 @@ public class ScoreProcessorDefinition extends ProcessorDefinition {
|
|||
public ProcessorDefinition resolveAttributes(AttributeResolver resolver) {
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void collectFields(SqlSourceBuilder sourceBuilder) {
|
||||
sourceBuilder.trackScores();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int depth() {
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -5,6 +5,7 @@
|
|||
*/
|
||||
package org.elasticsearch.xpack.sql.expression.function.scalar.processor.definition;
|
||||
|
||||
import org.elasticsearch.xpack.sql.execution.search.SqlSourceBuilder;
|
||||
import org.elasticsearch.xpack.sql.expression.Expression;
|
||||
import org.elasticsearch.xpack.sql.expression.function.scalar.processor.runtime.ChainingProcessor;
|
||||
import org.elasticsearch.xpack.sql.expression.function.scalar.processor.runtime.Processor;
|
||||
|
@ -56,6 +57,16 @@ public final class UnaryProcessorDefinition extends ProcessorDefinition {
|
|||
return new UnaryProcessorDefinition(expression(), newChild, action);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void collectFields(SqlSourceBuilder sourceBuilder) {
|
||||
child.collectFields(sourceBuilder);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int depth() {
|
||||
return child.depth();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(expression(), child, action);
|
||||
|
|
|
@ -6,7 +6,6 @@
|
|||
package org.elasticsearch.xpack.sql.querydsl.container;
|
||||
|
||||
import org.elasticsearch.xpack.sql.execution.search.FieldExtraction;
|
||||
import org.elasticsearch.xpack.sql.execution.search.SqlSourceBuilder;
|
||||
|
||||
/**
|
||||
* Entity representing a 'column' backed by one or multiple results from ES. A
|
||||
|
@ -14,13 +13,5 @@ import org.elasticsearch.xpack.sql.execution.search.SqlSourceBuilder;
|
|||
* set, so extends {@link FieldExtraction}.
|
||||
*/
|
||||
public interface ColumnReference extends FieldExtraction {
|
||||
|
||||
/**
|
||||
* Indicates the depth of the result. Used for counting the actual size of a
|
||||
* result by knowing how many nested levels there are. Typically used by
|
||||
* aggregations.
|
||||
*
|
||||
* @return depth of the result
|
||||
*/
|
||||
int depth();
|
||||
// TODO remove this interface intirely in a followup
|
||||
}
|
||||
|
|
|
@ -5,32 +5,15 @@
|
|||
*/
|
||||
package org.elasticsearch.xpack.sql.querydsl.container;
|
||||
|
||||
import org.elasticsearch.xpack.sql.execution.search.FieldExtraction;
|
||||
import org.elasticsearch.xpack.sql.execution.search.SqlSourceBuilder;
|
||||
import org.elasticsearch.xpack.sql.expression.function.scalar.processor.definition.ProcessorDefinition;
|
||||
import org.elasticsearch.xpack.sql.expression.function.scalar.processor.definition.ReferenceInput;
|
||||
import org.elasticsearch.xpack.sql.expression.function.scalar.processor.definition.ScoreProcessorDefinition;
|
||||
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
public class ComputedRef implements ColumnReference {
|
||||
|
||||
private final ProcessorDefinition processor;
|
||||
private final int depth;
|
||||
|
||||
public ComputedRef(ProcessorDefinition processor) {
|
||||
this.processor = processor;
|
||||
|
||||
// compute maximum depth
|
||||
AtomicInteger d = new AtomicInteger(0);
|
||||
processor.forEachDown(i -> {
|
||||
ColumnReference ref = i.context();
|
||||
if (ref.depth() > d.get()) {
|
||||
d.set(ref.depth());
|
||||
}
|
||||
}, ReferenceInput.class);
|
||||
|
||||
depth = d.get();
|
||||
}
|
||||
|
||||
public ProcessorDefinition processor() {
|
||||
|
@ -39,7 +22,7 @@ public class ComputedRef implements ColumnReference {
|
|||
|
||||
@Override
|
||||
public int depth() {
|
||||
return depth;
|
||||
return processor.depth();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -50,7 +33,6 @@ public class ComputedRef implements ColumnReference {
|
|||
@Override
|
||||
public void collectFields(SqlSourceBuilder sourceBuilder) {
|
||||
processor.collectFields(sourceBuilder);
|
||||
processor.forEachUp(ri -> ri.context().collectFields(sourceBuilder), ReferenceInput.class);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -5,7 +5,9 @@
|
|||
*/
|
||||
package org.elasticsearch.xpack.sql.expression.function.scalar.processor.definition;
|
||||
|
||||
import org.elasticsearch.search.builder.SearchSourceBuilder;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.xpack.sql.execution.search.SqlSourceBuilder;
|
||||
import org.elasticsearch.xpack.sql.expression.function.scalar.processor.definition.ProcessorDefinition.AttributeResolver;
|
||||
import org.elasticsearch.xpack.sql.expression.function.scalar.processor.runtime.Processor;
|
||||
|
||||
|
@ -48,6 +50,49 @@ public class BinaryProcessorDefinitionTests extends ESTestCase {
|
|||
assertEquals(expected, d.resolveAttributes(resolver));
|
||||
}
|
||||
|
||||
public void testCollectFields() {
|
||||
DummyProcessorDefinition wantsScore = new DummyProcessorDefinition(randomBoolean()) {
|
||||
@Override
|
||||
public void collectFields(SqlSourceBuilder sourceBuilder) {
|
||||
sourceBuilder.trackScores();
|
||||
}
|
||||
};
|
||||
DummyProcessorDefinition wantsNothing = new DummyProcessorDefinition(randomBoolean());
|
||||
assertFalse(tracksScores(new DummyBinaryProcessorDefinition(wantsNothing, wantsNothing)));
|
||||
assertTrue(tracksScores(new DummyBinaryProcessorDefinition(wantsScore, wantsNothing)));
|
||||
assertTrue(tracksScores(new DummyBinaryProcessorDefinition(wantsNothing, wantsScore)));
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns {@code true} if the processor defintion builds a query that
|
||||
* tracks scores, {@code false} otherwise. Used for testing
|
||||
* {@link ProcessorDefinition#collectFields(SqlSourceBuilder)}.
|
||||
*/
|
||||
static boolean tracksScores(ProcessorDefinition d) {
|
||||
SqlSourceBuilder b = new SqlSourceBuilder();
|
||||
d.collectFields(b);
|
||||
SearchSourceBuilder source = new SearchSourceBuilder();
|
||||
b.build(source);
|
||||
return source.trackScores();
|
||||
}
|
||||
|
||||
public void testDepth() {
|
||||
ProcessorDefinition first = dummyWithDepth(randomInt());
|
||||
ProcessorDefinition second = dummyWithDepth(randomInt());
|
||||
int maxDepth = Math.max(first.depth(), second.depth());
|
||||
assertEquals(maxDepth, new DummyBinaryProcessorDefinition(first, second).depth());
|
||||
assertEquals(maxDepth, new DummyBinaryProcessorDefinition(second, first).depth());
|
||||
}
|
||||
|
||||
static ProcessorDefinition dummyWithDepth(int depth) {
|
||||
return new DummyProcessorDefinition(randomBoolean()) {
|
||||
@Override
|
||||
public int depth() {
|
||||
return depth;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
public static final class DummyBinaryProcessorDefinition extends BinaryProcessorDefinition {
|
||||
public DummyBinaryProcessorDefinition(ProcessorDefinition left, ProcessorDefinition right) {
|
||||
super(null, left, right);
|
||||
|
@ -91,5 +136,14 @@ public class BinaryProcessorDefinitionTests extends ESTestCase {
|
|||
public ProcessorDefinition resolveAttributes(AttributeResolver resolver) {
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void collectFields(SqlSourceBuilder sourceBuilder) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public int depth() {
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -6,10 +6,13 @@
|
|||
package org.elasticsearch.xpack.sql.expression.function.scalar.processor.definition;
|
||||
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.xpack.sql.execution.search.SqlSourceBuilder;
|
||||
import org.elasticsearch.xpack.sql.expression.function.scalar.processor.definition.BinaryProcessorDefinitionTests.DummyProcessorDefinition;
|
||||
import org.elasticsearch.xpack.sql.expression.function.scalar.processor.definition.ProcessorDefinition.AttributeResolver;
|
||||
import org.elasticsearch.xpack.sql.expression.function.scalar.processor.runtime.Processor;
|
||||
|
||||
import static org.elasticsearch.xpack.sql.expression.function.scalar.processor.definition.BinaryProcessorDefinitionTests.dummyWithDepth;
|
||||
import static org.elasticsearch.xpack.sql.expression.function.scalar.processor.definition.BinaryProcessorDefinitionTests.tracksScores;
|
||||
import static java.util.Collections.emptyList;
|
||||
|
||||
public class UnaryProcessorDefinitionTests extends ESTestCase {
|
||||
|
@ -43,6 +46,23 @@ public class UnaryProcessorDefinitionTests extends ESTestCase {
|
|||
assertEquals(expected, d.resolveAttributes(resolver));
|
||||
}
|
||||
|
||||
public void testCollectFields() {
|
||||
DummyProcessorDefinition wantsScore = new DummyProcessorDefinition(randomBoolean()) {
|
||||
@Override
|
||||
public void collectFields(SqlSourceBuilder sourceBuilder) {
|
||||
sourceBuilder.trackScores();
|
||||
}
|
||||
};
|
||||
DummyProcessorDefinition wantsNothing = new DummyProcessorDefinition(randomBoolean());
|
||||
assertFalse(tracksScores(newUnaryProcessor(wantsNothing)));
|
||||
assertTrue(tracksScores(newUnaryProcessor(wantsScore)));
|
||||
}
|
||||
|
||||
public void testDepth() {
|
||||
ProcessorDefinition child = dummyWithDepth(randomInt());
|
||||
assertEquals(child.depth(), newUnaryProcessor(child).depth());
|
||||
}
|
||||
|
||||
private ProcessorDefinition newUnaryProcessor(ProcessorDefinition child) {
|
||||
return new UnaryProcessorDefinition(null, child, null);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue