SQL: Switch attribute resolution to OO (elastic/x-pack-elasticsearch#3433)
Switch attribute resolution from tree matching to OO. Adds `ProcessorDefinition#resolveAttributes` which subclasess implement to rewrite themselves against a query. `AttributeInput`s use this to replace themselves with `ReferenceInput`s. Original commit: elastic/x-pack-elasticsearch@97270d2ea4
This commit is contained in:
parent
f8da04dd3a
commit
c6c81c940e
|
@ -25,26 +25,31 @@ public class BinaryArithmeticProcessorDefinition extends BinaryProcessorDefiniti
|
|||
return operation;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected BinaryProcessorDefinition replaceChildren(ProcessorDefinition left, ProcessorDefinition right) {
|
||||
return new BinaryArithmeticProcessorDefinition(expression(), left, right, operation);
|
||||
}
|
||||
|
||||
@Override
|
||||
public BinaryArithmeticProcessor asProcessor() {
|
||||
return new BinaryArithmeticProcessor(left().asProcessor(), right().asProcessor(), operation);
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(left(), right(), operation);
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public boolean equals(Object obj) {
|
||||
if (this == obj) {
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
if (obj == null || getClass() != obj.getClass()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
|
||||
BinaryArithmeticProcessorDefinition other = (BinaryArithmeticProcessorDefinition) obj;
|
||||
return Objects.equals(operation, other.operation)
|
||||
&& Objects.equals(left(), other.left())
|
||||
|
|
|
@ -17,4 +17,9 @@ public class AggNameInput extends NonExecutableInput<String> {
|
|||
public final boolean supportedByAggsOnlyQuery() {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public final ProcessorDefinition resolveAttributes(AttributeResolver resolver) {
|
||||
return this;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -48,6 +48,11 @@ public class AggPathInput extends NonExecutableInput<String> {
|
|||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ProcessorDefinition resolveAttributes(AttributeResolver resolver) {
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(context(), innerKey);
|
||||
|
|
|
@ -38,6 +38,11 @@ public class AggValueInput extends LeafInput<Supplier<Object>> {
|
|||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ProcessorDefinition resolveAttributes(AttributeResolver resolver) {
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(context(), innerKey);
|
||||
|
|
|
@ -8,8 +8,11 @@ package org.elasticsearch.xpack.sql.expression.function.scalar.processor.definit
|
|||
import org.elasticsearch.xpack.sql.expression.Attribute;
|
||||
import org.elasticsearch.xpack.sql.expression.Expression;
|
||||
|
||||
/**
|
||||
* An input that must first be rewritten against the rest of the query
|
||||
* before it can be further processed.
|
||||
*/
|
||||
public class AttributeInput extends NonExecutableInput<Attribute> {
|
||||
|
||||
public AttributeInput(Expression expression, Attribute context) {
|
||||
super(expression, context);
|
||||
}
|
||||
|
@ -18,4 +21,9 @@ public class AttributeInput extends NonExecutableInput<Attribute> {
|
|||
public final boolean supportedByAggsOnlyQuery() {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ProcessorDefinition resolveAttributes(AttributeResolver resolver) {
|
||||
return new ReferenceInput(expression(), resolver.resolve(context()));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -32,6 +32,22 @@ public abstract class BinaryProcessorDefinition extends ProcessorDefinition {
|
|||
return left.supportedByAggsOnlyQuery() && right.supportedByAggsOnlyQuery();
|
||||
}
|
||||
|
||||
@Override
|
||||
public final ProcessorDefinition resolveAttributes(AttributeResolver resolver) {
|
||||
ProcessorDefinition newLeft = left.resolveAttributes(resolver);
|
||||
ProcessorDefinition newRight = right.resolveAttributes(resolver);
|
||||
if (newLeft == left && newRight == right) {
|
||||
return this;
|
||||
}
|
||||
return replaceChildren(newLeft, newRight);
|
||||
}
|
||||
|
||||
/**
|
||||
* Build a copy of this object with new left and right children. Used by
|
||||
* {@link #resolveAttributes(AttributeResolver)}.
|
||||
*/
|
||||
protected abstract BinaryProcessorDefinition replaceChildren(ProcessorDefinition left, ProcessorDefinition right);
|
||||
|
||||
@Override
|
||||
public boolean resolved() {
|
||||
return left().resolved() && right().resolved();
|
||||
|
|
|
@ -24,4 +24,9 @@ public class ConstantInput extends LeafInput<Object> {
|
|||
public final boolean supportedByAggsOnlyQuery() {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ProcessorDefinition resolveAttributes(AttributeResolver resolver) {
|
||||
return this;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -25,4 +25,9 @@ public class HitExtractorInput extends LeafInput<HitExtractor> {
|
|||
public final boolean supportedByAggsOnlyQuery() {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ProcessorDefinition resolveAttributes(AttributeResolver resolver) {
|
||||
return this;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -44,7 +44,7 @@ public abstract class LeafInput<T> extends ProcessorDefinition {
|
|||
return false;
|
||||
}
|
||||
|
||||
AggPathInput other = (AggPathInput) obj;
|
||||
LeafInput<?> other = (LeafInput<?>) obj;
|
||||
return Objects.equals(context(), other.context())
|
||||
&& Objects.equals(expression(), other.expression());
|
||||
}
|
||||
|
|
|
@ -7,8 +7,10 @@ package org.elasticsearch.xpack.sql.expression.function.scalar.processor.definit
|
|||
|
||||
import org.elasticsearch.xpack.sql.execution.search.FieldExtraction;
|
||||
import org.elasticsearch.xpack.sql.execution.search.SqlSourceBuilder;
|
||||
import org.elasticsearch.xpack.sql.expression.Attribute;
|
||||
import org.elasticsearch.xpack.sql.expression.Expression;
|
||||
import org.elasticsearch.xpack.sql.expression.function.scalar.processor.runtime.Processor;
|
||||
import org.elasticsearch.xpack.sql.querydsl.container.ColumnReference;
|
||||
import org.elasticsearch.xpack.sql.tree.Node;
|
||||
|
||||
import java.util.List;
|
||||
|
@ -34,4 +36,9 @@ public abstract class ProcessorDefinition extends Node<ProcessorDefinition> impl
|
|||
public void collectFields(SqlSourceBuilder sourceBuilder) {
|
||||
// No fields needed
|
||||
}
|
||||
|
||||
public abstract ProcessorDefinition resolveAttributes(AttributeResolver resolver);
|
||||
public interface AttributeResolver {
|
||||
ColumnReference resolve(Attribute attribute);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -9,7 +9,6 @@ import org.elasticsearch.xpack.sql.expression.Expression;
|
|||
import org.elasticsearch.xpack.sql.querydsl.container.ColumnReference;
|
||||
|
||||
public class ReferenceInput extends NonExecutableInput<ColumnReference> {
|
||||
|
||||
public ReferenceInput(Expression expression, ColumnReference context) {
|
||||
super(expression, context);
|
||||
}
|
||||
|
@ -18,4 +17,9 @@ public class ReferenceInput extends NonExecutableInput<ColumnReference> {
|
|||
public final boolean supportedByAggsOnlyQuery() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ProcessorDefinition resolveAttributes(AttributeResolver resolver) {
|
||||
return this;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -38,4 +38,9 @@ public class ScoreProcessorDefinition extends ProcessorDefinition {
|
|||
public boolean supportedByAggsOnlyQuery() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ProcessorDefinition resolveAttributes(AttributeResolver resolver) {
|
||||
return this;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -13,7 +13,7 @@ import java.util.Objects;
|
|||
|
||||
import static java.util.Collections.singletonList;
|
||||
|
||||
public class UnaryProcessorDefinition extends ProcessorDefinition {
|
||||
public final class UnaryProcessorDefinition extends ProcessorDefinition {
|
||||
|
||||
private final ProcessorDefinition child;
|
||||
private final Processor action;
|
||||
|
@ -47,6 +47,15 @@ public class UnaryProcessorDefinition extends ProcessorDefinition {
|
|||
return child.supportedByAggsOnlyQuery();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ProcessorDefinition resolveAttributes(AttributeResolver resolver) {
|
||||
ProcessorDefinition newChild = child.resolveAttributes(resolver);
|
||||
if (newChild == child) {
|
||||
return this;
|
||||
}
|
||||
return new UnaryProcessorDefinition(expression(), newChild, action);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(expression(), child, action);
|
||||
|
|
|
@ -233,18 +233,28 @@ public class QueryContainer {
|
|||
}
|
||||
proc = sfa.processorDef();
|
||||
}
|
||||
AtomicReference<QueryContainer> containerRef = new AtomicReference<>(this);
|
||||
|
||||
// find the processor inputs (Attributes) and convert them into references
|
||||
// no need to promote them to the top since the container doesn't have to be aware
|
||||
proc = proc.transformUp(l -> {
|
||||
Attribute attr = aliases.getOrDefault(l.context(), l.context());
|
||||
Tuple<QueryContainer, ColumnReference> ref = containerRef.get().toReference(attr);
|
||||
containerRef.set(ref.v1());
|
||||
return new ReferenceInput(l.expression(), ref.v2());
|
||||
}, AttributeInput.class);
|
||||
class QueryAttributeResolver implements ProcessorDefinition.AttributeResolver {
|
||||
private QueryContainer container;
|
||||
|
||||
private QueryAttributeResolver(QueryContainer container) {
|
||||
this.container = container;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ColumnReference resolve(Attribute attribute) {
|
||||
Attribute attr = aliases.getOrDefault(attribute, attribute);
|
||||
Tuple<QueryContainer, ColumnReference> ref = container.toReference(attr);
|
||||
container = ref.v1();
|
||||
return ref.v2();
|
||||
}
|
||||
}
|
||||
QueryAttributeResolver resolver = new QueryAttributeResolver(this);
|
||||
proc = proc.resolveAttributes(resolver);
|
||||
QueryContainer qContainer = resolver.container;
|
||||
|
||||
QueryContainer qContainer = containerRef.get();
|
||||
// update proc
|
||||
Map<Attribute, ProcessorDefinition> procs = new LinkedHashMap<>(qContainer.scalarFunctions());
|
||||
procs.put(name, proc);
|
||||
|
|
|
@ -0,0 +1,28 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.sql.expression.function.scalar.processor.definition;
|
||||
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.xpack.sql.expression.Attribute;
|
||||
import org.elasticsearch.xpack.sql.expression.Expression;
|
||||
import org.elasticsearch.xpack.sql.querydsl.container.ColumnReference;
|
||||
|
||||
import static org.mockito.Mockito.mock;
|
||||
|
||||
public class AttributeInputTests extends ESTestCase {
|
||||
public void testResolveAttributes() {
|
||||
ColumnReference column = mock(ColumnReference.class);
|
||||
Expression expression = mock(Expression.class);
|
||||
Attribute attribute = mock(Attribute.class);
|
||||
|
||||
ReferenceInput expected = new ReferenceInput(expression, column);
|
||||
|
||||
assertEquals(expected, new AttributeInput(expression, attribute).resolveAttributes(a -> {
|
||||
assertSame(attribute, a);
|
||||
return column;
|
||||
}));
|
||||
}
|
||||
}
|
|
@ -6,6 +6,7 @@
|
|||
package org.elasticsearch.xpack.sql.expression.function.scalar.processor.definition;
|
||||
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.xpack.sql.expression.function.scalar.processor.definition.ProcessorDefinition.AttributeResolver;
|
||||
import org.elasticsearch.xpack.sql.expression.function.scalar.processor.runtime.Processor;
|
||||
|
||||
import static java.util.Collections.emptyList;
|
||||
|
@ -15,25 +16,58 @@ public class BinaryProcessorDefinitionTests extends ESTestCase {
|
|||
ProcessorDefinition supported = new DummyProcessorDefinition(true);
|
||||
ProcessorDefinition unsupported = new DummyProcessorDefinition(false);
|
||||
|
||||
assertFalse(newBinaryProcessor(unsupported, unsupported).supportedByAggsOnlyQuery());
|
||||
assertFalse(newBinaryProcessor(unsupported, supported).supportedByAggsOnlyQuery());
|
||||
assertFalse(newBinaryProcessor(supported, unsupported).supportedByAggsOnlyQuery());
|
||||
assertTrue(newBinaryProcessor(supported, supported).supportedByAggsOnlyQuery());
|
||||
assertFalse(new DummyBinaryProcessorDefinition(unsupported, unsupported).supportedByAggsOnlyQuery());
|
||||
assertFalse(new DummyBinaryProcessorDefinition(unsupported, supported).supportedByAggsOnlyQuery());
|
||||
assertFalse(new DummyBinaryProcessorDefinition(supported, unsupported).supportedByAggsOnlyQuery());
|
||||
assertTrue(new DummyBinaryProcessorDefinition(supported, supported).supportedByAggsOnlyQuery());
|
||||
}
|
||||
|
||||
private ProcessorDefinition newBinaryProcessor(ProcessorDefinition left, ProcessorDefinition right) {
|
||||
return new BinaryProcessorDefinition(null, left, right) {
|
||||
public void testResolveAttributes() {
|
||||
ProcessorDefinition needsNothing = new DummyProcessorDefinition(randomBoolean());
|
||||
ProcessorDefinition resolvesTo = new DummyProcessorDefinition(randomBoolean());
|
||||
ProcessorDefinition needsResolution = new DummyProcessorDefinition(randomBoolean()) {
|
||||
@Override
|
||||
public Processor asProcessor() {
|
||||
return null;
|
||||
public ProcessorDefinition resolveAttributes(AttributeResolver resolver) {
|
||||
return resolvesTo;
|
||||
}
|
||||
};
|
||||
AttributeResolver resolver = a -> {
|
||||
fail("not exepected");
|
||||
return null;
|
||||
};
|
||||
|
||||
ProcessorDefinition d = new DummyBinaryProcessorDefinition(needsNothing, needsNothing);
|
||||
assertSame(d, d.resolveAttributes(resolver));
|
||||
|
||||
d = new DummyBinaryProcessorDefinition(needsNothing, needsResolution);
|
||||
ProcessorDefinition expected = new DummyBinaryProcessorDefinition(needsNothing, resolvesTo);
|
||||
assertEquals(expected, d.resolveAttributes(resolver));
|
||||
|
||||
d = new DummyBinaryProcessorDefinition(needsResolution, needsNothing);
|
||||
expected = new DummyBinaryProcessorDefinition(resolvesTo, needsNothing);
|
||||
assertEquals(expected, d.resolveAttributes(resolver));
|
||||
}
|
||||
|
||||
static class DummyProcessorDefinition extends ProcessorDefinition {
|
||||
public static final class DummyBinaryProcessorDefinition extends BinaryProcessorDefinition {
|
||||
public DummyBinaryProcessorDefinition(ProcessorDefinition left, ProcessorDefinition right) {
|
||||
super(null, left, right);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Processor asProcessor() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected BinaryProcessorDefinition replaceChildren(ProcessorDefinition left, ProcessorDefinition right) {
|
||||
return new DummyBinaryProcessorDefinition(left, right);
|
||||
}
|
||||
}
|
||||
|
||||
public static class DummyProcessorDefinition extends ProcessorDefinition {
|
||||
private final boolean supportedByAggsOnlyQuery;
|
||||
|
||||
DummyProcessorDefinition(boolean supportedByAggsOnlyQuery) {
|
||||
public DummyProcessorDefinition(boolean supportedByAggsOnlyQuery) {
|
||||
super(null, emptyList());
|
||||
this.supportedByAggsOnlyQuery = supportedByAggsOnlyQuery;
|
||||
}
|
||||
|
@ -52,5 +86,10 @@ public class BinaryProcessorDefinitionTests extends ESTestCase {
|
|||
public Processor asProcessor() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ProcessorDefinition resolveAttributes(AttributeResolver resolver) {
|
||||
return this;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -7,6 +7,7 @@ package org.elasticsearch.xpack.sql.expression.function.scalar.processor.definit
|
|||
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.xpack.sql.expression.function.scalar.processor.definition.BinaryProcessorDefinitionTests.DummyProcessorDefinition;
|
||||
import org.elasticsearch.xpack.sql.expression.function.scalar.processor.definition.ProcessorDefinition.AttributeResolver;
|
||||
import org.elasticsearch.xpack.sql.expression.function.scalar.processor.runtime.Processor;
|
||||
|
||||
import static java.util.Collections.emptyList;
|
||||
|
@ -20,6 +21,28 @@ public class UnaryProcessorDefinitionTests extends ESTestCase {
|
|||
assertTrue(newUnaryProcessor(supported).supportedByAggsOnlyQuery());
|
||||
}
|
||||
|
||||
public void testResolveAttributes() {
|
||||
ProcessorDefinition needsNothing = new DummyProcessorDefinition(randomBoolean());
|
||||
ProcessorDefinition resolvesTo = new DummyProcessorDefinition(randomBoolean());
|
||||
ProcessorDefinition needsResolution = new DummyProcessorDefinition(randomBoolean()) {
|
||||
@Override
|
||||
public ProcessorDefinition resolveAttributes(AttributeResolver resolver) {
|
||||
return resolvesTo;
|
||||
}
|
||||
};
|
||||
AttributeResolver resolver = a -> {
|
||||
fail("not exepected");
|
||||
return null;
|
||||
};
|
||||
|
||||
ProcessorDefinition d = newUnaryProcessor(needsNothing);
|
||||
assertSame(d, d.resolveAttributes(resolver));
|
||||
|
||||
d = newUnaryProcessor(needsResolution);
|
||||
ProcessorDefinition expected = newUnaryProcessor(resolvesTo);
|
||||
assertEquals(expected, d.resolveAttributes(resolver));
|
||||
}
|
||||
|
||||
private ProcessorDefinition newUnaryProcessor(ProcessorDefinition child) {
|
||||
return new UnaryProcessorDefinition(null, child, null);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue