diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/analysis/Analyzer.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/analysis/Analyzer.java index 9f18db3c920..5af9d9ec747 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/analysis/Analyzer.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/analysis/Analyzer.java @@ -6,6 +6,7 @@ package org.elasticsearch.xpack.eql.analysis; +import org.elasticsearch.xpack.eql.common.Failure; import org.elasticsearch.xpack.ql.expression.Attribute; import org.elasticsearch.xpack.ql.expression.NamedExpression; import org.elasticsearch.xpack.ql.expression.UnresolvedAttribute; diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/analysis/VerificationException.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/analysis/VerificationException.java index ac7800db056..afb3297ab2e 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/analysis/VerificationException.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/analysis/VerificationException.java @@ -7,23 +7,14 @@ package org.elasticsearch.xpack.eql.analysis; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.xpack.eql.EqlClientException; -import org.elasticsearch.xpack.ql.tree.Location; -import org.elasticsearch.xpack.ql.util.StringUtils; +import org.elasticsearch.xpack.eql.common.Failure; import java.util.Collection; -import java.util.stream.Collectors; public class VerificationException extends EqlClientException { protected VerificationException(Collection sources) { - super(asMessage(sources)); - } - - private static String asMessage(Collection failures) { - return failures.stream().map(f -> { - Location l = f.node().source().source(); - return "line " + l.getLineNumber() + ":" + l.getColumnNumber() + ": " + f.message(); - }).collect(Collectors.joining(StringUtils.NEW_LINE, "Found " + failures.size() + " problem(s)\n", StringUtils.EMPTY)); + super(Failure.failMessage(sources)); } @Override diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/analysis/Verifier.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/analysis/Verifier.java index 071bdb0cb00..66c363fe844 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/analysis/Verifier.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/analysis/Verifier.java @@ -6,6 +6,7 @@ package org.elasticsearch.xpack.eql.analysis; +import org.elasticsearch.xpack.eql.common.Failure; import org.elasticsearch.xpack.ql.capabilities.Unresolvable; import org.elasticsearch.xpack.ql.expression.Attribute; import org.elasticsearch.xpack.ql.expression.UnresolvedAttribute; @@ -22,7 +23,7 @@ import java.util.Map; import java.util.Set; import static java.util.stream.Collectors.toMap; -import static org.elasticsearch.xpack.eql.analysis.Failure.fail; +import static org.elasticsearch.xpack.eql.common.Failure.fail; /** * The verifier has the role of checking the analyzed tree for failures and build a list of failures following this check. diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/analysis/Failure.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/common/Failure.java similarity index 56% rename from x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/analysis/Failure.java rename to x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/common/Failure.java index d4dadb5be64..2172f6f5752 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/analysis/Failure.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/common/Failure.java @@ -4,29 +4,33 @@ * you may not use this file except in compliance with the Elastic License. */ -package org.elasticsearch.xpack.eql.analysis; +package org.elasticsearch.xpack.eql.common; +import org.elasticsearch.xpack.ql.tree.Location; import org.elasticsearch.xpack.ql.tree.Node; +import org.elasticsearch.xpack.ql.util.StringUtils; +import java.util.Collection; import java.util.Objects; +import java.util.stream.Collectors; import static org.elasticsearch.common.logging.LoggerMessageFormat.format; -class Failure { +public class Failure { private final Node node; private final String message; - Failure(Node node, String message) { + public Failure(Node node, String message) { this.node = node; this.message = message; } - Node node() { + public Node node() { return node; } - String message() { + public String message() { return message; } @@ -54,7 +58,16 @@ class Failure { return message; } - static Failure fail(Node source, String message, Object... args) { + public static Failure fail(Node source, String message, Object... args) { return new Failure(source, format(message, args)); } + + public static String failMessage(Collection failures) { + return failures.stream().map(f -> { + Location l = f.node().source().source(); + return "line " + l.getLineNumber() + ":" + l.getColumnNumber() + ": " + f.message(); + }).collect(Collectors.joining(StringUtils.NEW_LINE, + format("Found {} problem{}\n", failures.size(), failures.size() > 1 ? "s" : StringUtils.EMPTY), + StringUtils.EMPTY)); + } } diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plan/physical/EsQueryExec.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plan/physical/EsQueryExec.java new file mode 100644 index 00000000000..c7451b44595 --- /dev/null +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plan/physical/EsQueryExec.java @@ -0,0 +1,80 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.eql.plan.physical; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.xpack.eql.querydsl.container.QueryContainer; +import org.elasticsearch.xpack.eql.session.EqlSession; +import org.elasticsearch.xpack.eql.session.Results; +import org.elasticsearch.xpack.ql.expression.Attribute; +import org.elasticsearch.xpack.ql.tree.NodeInfo; +import org.elasticsearch.xpack.ql.tree.Source; + +import java.util.List; +import java.util.Objects; + +public class EsQueryExec extends LeafExec { + + private final String index; + private final List output; + private final QueryContainer queryContainer; + + public EsQueryExec(Source source, String index, List output, QueryContainer queryContainer) { + super(source); + this.index = index; + this.output = output; + this.queryContainer = queryContainer; + } + + @Override + protected NodeInfo info() { + return NodeInfo.create(this, EsQueryExec::new, index, output, queryContainer); + } + + public EsQueryExec with(QueryContainer queryContainer) { + return new EsQueryExec(source(), index, output, queryContainer); + } + + public String index() { + return index; + } + + @Override + public List output() { + return output; + } + + @Override + public void execute(EqlSession session, ActionListener listener) { + throw new UnsupportedOperationException(); + } + + @Override + public int hashCode() { + return Objects.hash(index, queryContainer, output); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + + if (obj == null || getClass() != obj.getClass()) { + return false; + } + + EsQueryExec other = (EsQueryExec) obj; + return Objects.equals(index, other.index) + && Objects.equals(queryContainer, other.queryContainer) + && Objects.equals(output, other.output); + } + + @Override + public String nodeString() { + return nodeName() + "[" + index + "," + queryContainer + "]"; + } +} \ No newline at end of file diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plan/physical/FilterExec.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plan/physical/FilterExec.java new file mode 100644 index 00000000000..fc4b104d50a --- /dev/null +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plan/physical/FilterExec.java @@ -0,0 +1,74 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.eql.plan.physical; + +import org.elasticsearch.xpack.ql.expression.Attribute; +import org.elasticsearch.xpack.ql.expression.Expression; +import org.elasticsearch.xpack.ql.tree.NodeInfo; +import org.elasticsearch.xpack.ql.tree.Source; + +import java.util.List; +import java.util.Objects; + +public class FilterExec extends UnaryExec implements Unexecutable { + + private final Expression condition; + private final boolean onAggs; + + public FilterExec(Source source, PhysicalPlan child, Expression condition) { + this(source, child, condition, false); + } + + public FilterExec(Source source, PhysicalPlan child, Expression condition, boolean onAggs) { + super(source, child); + this.condition = condition; + this.onAggs = onAggs; + } + + @Override + protected NodeInfo info() { + return NodeInfo.create(this, FilterExec::new, child(), condition, onAggs); + } + + @Override + protected FilterExec replaceChild(PhysicalPlan newChild) { + return new FilterExec(source(), newChild, condition, onAggs); + } + + public Expression condition() { + return condition; + } + + public boolean onAggs() { + return onAggs; + } + + @Override + public List output() { + return child().output(); + } + + @Override + public int hashCode() { + return Objects.hash(condition, onAggs, child()); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + + if (obj == null || getClass() != obj.getClass()) { + return false; + } + + FilterExec other = (FilterExec) obj; + return onAggs == other.onAggs + && Objects.equals(condition, other.condition) + && Objects.equals(child(), other.child()); + } +} diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plan/physical/LeafExec.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plan/physical/LeafExec.java new file mode 100644 index 00000000000..9f39002e925 --- /dev/null +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plan/physical/LeafExec.java @@ -0,0 +1,23 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.eql.plan.physical; + +import org.elasticsearch.xpack.ql.tree.Source; + +import java.util.Collections; +import java.util.List; + +public abstract class LeafExec extends PhysicalPlan { + + protected LeafExec(Source source) { + super(source, Collections.emptyList()); + } + + @Override + public final LeafExec replaceChildren(List newChildren) { + throw new UnsupportedOperationException("this type of node doesn't have any children to replace"); + } +} diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plan/physical/LimitExec.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plan/physical/LimitExec.java new file mode 100644 index 00000000000..ff166851555 --- /dev/null +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plan/physical/LimitExec.java @@ -0,0 +1,56 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.eql.plan.physical; + +import org.elasticsearch.xpack.ql.expression.Expression; +import org.elasticsearch.xpack.ql.tree.NodeInfo; +import org.elasticsearch.xpack.ql.tree.Source; + +import java.util.Objects; + +public class LimitExec extends UnaryExec implements Unexecutable { + + private final Expression limit; + + public LimitExec(Source source, PhysicalPlan child, Expression limit) { + super(source, child); + this.limit = limit; + } + + @Override + protected NodeInfo info() { + return NodeInfo.create(this, LimitExec::new, child(), limit); + } + + @Override + protected LimitExec replaceChild(PhysicalPlan newChild) { + return new LimitExec(source(), newChild, limit); + } + + public Expression limit() { + return limit; + } + + @Override + public int hashCode() { + return Objects.hash(limit, child()); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + + if (obj == null || getClass() != obj.getClass()) { + return false; + } + + LimitExec other = (LimitExec) obj; + return Objects.equals(limit, other.limit) + && Objects.equals(child(), other.child()); + } +} diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plan/physical/LocalExec.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plan/physical/LocalExec.java new file mode 100644 index 00000000000..48bd65d1a1c --- /dev/null +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plan/physical/LocalExec.java @@ -0,0 +1,70 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.eql.plan.physical; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.xpack.eql.session.EmptyExecutable; +import org.elasticsearch.xpack.eql.session.EqlSession; +import org.elasticsearch.xpack.eql.session.Executable; +import org.elasticsearch.xpack.eql.session.Results; +import org.elasticsearch.xpack.ql.expression.Attribute; +import org.elasticsearch.xpack.ql.tree.NodeInfo; +import org.elasticsearch.xpack.ql.tree.Source; + +import java.util.List; +import java.util.Objects; + +public class LocalExec extends LeafExec { + + private final Executable executable; + + public LocalExec(Source source, Executable executable) { + super(source); + this.executable = executable; + } + + @Override + protected NodeInfo info() { + return NodeInfo.create(this, LocalExec::new, executable); + } + + public Executable executable() { + return executable; + } + + @Override + public List output() { + return executable.output(); + } + + public boolean isEmpty() { + return executable instanceof EmptyExecutable; + } + + @Override + public void execute(EqlSession session, ActionListener listener) { + executable.execute(session, listener); + } + + @Override + public int hashCode() { + return Objects.hash(executable); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + + if (obj == null || getClass() != obj.getClass()) { + return false; + } + + LocalExec other = (LocalExec) obj; + return Objects.equals(executable, other.executable); + } +} diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plan/physical/LocalRelation.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plan/physical/LocalRelation.java new file mode 100644 index 00000000000..cb30beeb690 --- /dev/null +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plan/physical/LocalRelation.java @@ -0,0 +1,78 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.eql.plan.physical; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.xpack.eql.session.EqlSession; +import org.elasticsearch.xpack.eql.session.Executable; +import org.elasticsearch.xpack.eql.session.Results; +import org.elasticsearch.xpack.ql.expression.Attribute; +import org.elasticsearch.xpack.ql.plan.logical.LogicalPlan; +import org.elasticsearch.xpack.ql.tree.NodeInfo; +import org.elasticsearch.xpack.ql.tree.Source; + +import java.util.List; +import java.util.Objects; + +import static java.util.Collections.emptyList; + +public class LocalRelation extends LogicalPlan implements Executable { + + private final Executable executable; + + public LocalRelation(Source source, Executable executable) { + super(source, emptyList()); + this.executable = executable; + } + + @Override + protected NodeInfo info() { + return NodeInfo.create(this, LocalRelation::new, executable); + } + + @Override + public LogicalPlan replaceChildren(List newChildren) { + throw new UnsupportedOperationException("this type of node doesn't have any children to replace"); + } + + public Executable executable() { + return executable; + } + + @Override + public boolean expressionsResolved() { + return true; + } + + @Override + public List output() { + return executable.output(); + } + + @Override + public void execute(EqlSession session, ActionListener listener) { + executable.execute(session, listener); + } + + @Override + public int hashCode() { + return executable.hashCode(); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + + if (obj == null || getClass() != obj.getClass()) { + return false; + } + + LocalRelation other = (LocalRelation) obj; + return Objects.equals(executable, other.executable); + } +} diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plan/physical/OrderExec.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plan/physical/OrderExec.java new file mode 100644 index 00000000000..541c64070c3 --- /dev/null +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plan/physical/OrderExec.java @@ -0,0 +1,57 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.eql.plan.physical; + +import org.elasticsearch.xpack.ql.expression.Order; +import org.elasticsearch.xpack.ql.tree.NodeInfo; +import org.elasticsearch.xpack.ql.tree.Source; + +import java.util.List; +import java.util.Objects; + +public class OrderExec extends UnaryExec implements Unexecutable { + + private final List order; + + public OrderExec(Source source, PhysicalPlan child, List order) { + super(source, child); + this.order = order; + } + + @Override + protected NodeInfo info() { + return NodeInfo.create(this, OrderExec::new, child(), order); + } + + @Override + protected OrderExec replaceChild(PhysicalPlan newChild) { + return new OrderExec(source(), newChild, order); + } + + public List order() { + return order; + } + + @Override + public int hashCode() { + return Objects.hash(order, child()); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null || getClass() != obj.getClass()) { + return false; + } + + OrderExec other = (OrderExec) obj; + + return Objects.equals(order, other.order) + && Objects.equals(child(), other.child()); + } +} diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plan/physical/UnaryExec.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plan/physical/UnaryExec.java new file mode 100644 index 00000000000..26389c065d3 --- /dev/null +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plan/physical/UnaryExec.java @@ -0,0 +1,61 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.eql.plan.physical; + +import org.elasticsearch.xpack.ql.expression.Attribute; +import org.elasticsearch.xpack.ql.tree.Source; + +import java.util.Collections; +import java.util.List; +import java.util.Objects; + +public abstract class UnaryExec extends PhysicalPlan { + + private final PhysicalPlan child; + + protected UnaryExec(Source source, PhysicalPlan child) { + super(source, Collections.singletonList(child)); + this.child = child; + } + + @Override + public final PhysicalPlan replaceChildren(List newChildren) { + if (newChildren.size() != 1) { + throw new IllegalArgumentException("expected [1] child but received [" + newChildren.size() + "]"); + } + return replaceChild(newChildren.get(0)); + } + + protected abstract UnaryExec replaceChild(PhysicalPlan newChild); + + public PhysicalPlan child() { + return child; + } + + @Override + public List output() { + return child.output(); + } + + @Override + public int hashCode() { + return Objects.hashCode(child()); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null || getClass() != obj.getClass()) { + return false; + } + + UnaryExec other = (UnaryExec) obj; + + return Objects.equals(child, other.child); + } +} diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plan/physical/Unexecutable.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plan/physical/Unexecutable.java new file mode 100644 index 00000000000..69a6ca51271 --- /dev/null +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plan/physical/Unexecutable.java @@ -0,0 +1,22 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.eql.plan.physical; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.xpack.eql.planner.PlanningException; +import org.elasticsearch.xpack.eql.session.EqlSession; +import org.elasticsearch.xpack.eql.session.Executable; +import org.elasticsearch.xpack.eql.session.Results; + + +// this is mainly a marker interface to validate a plan before being executed +public interface Unexecutable extends Executable { + + @Override + default void execute(EqlSession session, ActionListener listener) { + throw new PlanningException("Current plan {} is not executable", this); + } +} diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plan/physical/UnplannedExec.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plan/physical/UnplannedExec.java new file mode 100644 index 00000000000..45061b3f961 --- /dev/null +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plan/physical/UnplannedExec.java @@ -0,0 +1,71 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.eql.plan.physical; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.xpack.eql.planner.PlanningException; +import org.elasticsearch.xpack.eql.session.EqlSession; +import org.elasticsearch.xpack.eql.session.Results; +import org.elasticsearch.xpack.ql.expression.Attribute; +import org.elasticsearch.xpack.ql.plan.logical.LogicalPlan; +import org.elasticsearch.xpack.ql.tree.NodeInfo; +import org.elasticsearch.xpack.ql.tree.Source; + +import java.util.List; +import java.util.Objects; + +public class UnplannedExec extends LeafExec implements Unexecutable { + + private final LogicalPlan plan; + + public UnplannedExec(Source source, LogicalPlan plan) { + super(source); + this.plan = plan; + } + + @Override + protected NodeInfo info() { + return NodeInfo.create(this, UnplannedExec::new, plan); + } + + public LogicalPlan plan() { + return plan; + } + + @Override + public List output() { + return plan.output(); + } + + @Override + public void execute(EqlSession session, ActionListener listener) { + throw new PlanningException("Current plan {} is not executable", this); + } + + @Override + public int hashCode() { + return plan.hashCode(); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + + if (obj == null || getClass() != obj.getClass()) { + return false; + } + + UnplannedExec other = (UnplannedExec) obj; + return Objects.equals(plan, other.plan); + } + + @Override + public String nodeString() { + return nodeName() + "[" + plan.nodeString() + "]"; + } +} \ No newline at end of file diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/planner/Mapper.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/planner/Mapper.java new file mode 100644 index 00000000000..225683a21d4 --- /dev/null +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/planner/Mapper.java @@ -0,0 +1,107 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.eql.planner; + +import org.elasticsearch.xpack.eql.plan.physical.EsQueryExec; +import org.elasticsearch.xpack.eql.plan.physical.FilterExec; +import org.elasticsearch.xpack.eql.plan.physical.LimitExec; +import org.elasticsearch.xpack.eql.plan.physical.LocalExec; +import org.elasticsearch.xpack.eql.plan.physical.LocalRelation; +import org.elasticsearch.xpack.eql.plan.physical.OrderExec; +import org.elasticsearch.xpack.eql.plan.physical.PhysicalPlan; +import org.elasticsearch.xpack.eql.plan.physical.UnplannedExec; +import org.elasticsearch.xpack.eql.querydsl.container.QueryContainer; +import org.elasticsearch.xpack.ql.expression.Attribute; +import org.elasticsearch.xpack.ql.plan.logical.EsRelation; +import org.elasticsearch.xpack.ql.plan.logical.Filter; +import org.elasticsearch.xpack.ql.plan.logical.Limit; +import org.elasticsearch.xpack.ql.plan.logical.LogicalPlan; +import org.elasticsearch.xpack.ql.plan.logical.OrderBy; +import org.elasticsearch.xpack.ql.rule.Rule; +import org.elasticsearch.xpack.ql.rule.RuleExecutor; +import org.elasticsearch.xpack.ql.util.ReflectionUtils; + +import java.util.Arrays; +import java.util.List; + +class Mapper extends RuleExecutor { + + PhysicalPlan map(LogicalPlan plan) { + return execute(planLater(plan)); + } + + @Override + protected Iterable.Batch> batches() { + Batch conversion = new Batch("Mapping", new SimpleExecMapper()); + + return Arrays.asList(conversion); + } + + private static PhysicalPlan planLater(LogicalPlan plan) { + return new UnplannedExec(plan.source(), plan); + } + + private static class SimpleExecMapper extends MapExecRule { + + @Override + protected PhysicalPlan map(LogicalPlan p) { + + if (p instanceof LocalRelation) { + return new LocalExec(p.source(), ((LocalRelation) p).executable()); + } + + if (p instanceof Filter) { + Filter fl = (Filter) p; + return new FilterExec(p.source(), map(fl.child()), fl.condition()); + } + + if (p instanceof OrderBy) { + OrderBy o = (OrderBy) p; + return new OrderExec(p.source(), map(o.child()), o.order()); + } + + if (p instanceof Limit) { + Limit l = (Limit) p; + return new LimitExec(p.source(), map(l.child()), l.limit()); + } + + if (p instanceof EsRelation) { + EsRelation c = (EsRelation) p; + List output = c.output(); + QueryContainer container = new QueryContainer(); + if (c.frozen()) { + container = container.withFrozen(); + } + return new EsQueryExec(p.source(), c.index().name(), output, container); + } + + return planLater(p); + } + } + + abstract static class MapExecRule extends Rule { + + private final Class subPlanToken = ReflectionUtils.detectSuperTypeForRuleLike(getClass()); + + @Override + public final PhysicalPlan apply(PhysicalPlan plan) { + return plan.transformUp(this::rule, UnplannedExec.class); + } + + @SuppressWarnings("unchecked") + @Override + protected final PhysicalPlan rule(UnplannedExec plan) { + LogicalPlan subPlan = plan.plan(); + if (subPlanToken.isInstance(subPlan)) { + return map((SubPlan) subPlan); + } + return plan; + } + + protected abstract PhysicalPlan map(SubPlan plan); + } +} \ No newline at end of file diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/planner/Planner.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/planner/Planner.java index 0eb373d6809..b0bcce4d72f 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/planner/Planner.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/planner/Planner.java @@ -6,12 +6,42 @@ package org.elasticsearch.xpack.eql.planner; +import org.elasticsearch.xpack.eql.common.Failure; import org.elasticsearch.xpack.eql.plan.physical.PhysicalPlan; import org.elasticsearch.xpack.ql.plan.logical.LogicalPlan; +import java.util.List; + public class Planner { + private final Mapper mapper = new Mapper(); + private final QueryFolder folder = new QueryFolder(); + public PhysicalPlan plan(LogicalPlan plan) { - throw new UnsupportedOperationException(); + return foldPlan(mapPlan(plan)); + } + + PhysicalPlan mapPlan(LogicalPlan plan) { + return verifyMappingPlan(mapper.map(plan)); + } + + PhysicalPlan foldPlan(PhysicalPlan mapped) { + return verifyExecutingPlan(folder.fold(mapped)); + } + + PhysicalPlan verifyMappingPlan(PhysicalPlan plan) { + List failures = Verifier.verifyMappingPlan(plan); + if (failures.isEmpty() == false) { + throw new PlanningException(failures); + } + return plan; + } + + PhysicalPlan verifyExecutingPlan(PhysicalPlan plan) { + List failures = Verifier.verifyExecutingPlan(plan); + if (failures.isEmpty() == false) { + throw new PlanningException(failures); + } + return plan; } } diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/planner/PlanningException.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/planner/PlanningException.java new file mode 100644 index 00000000000..d2f959c0e51 --- /dev/null +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/planner/PlanningException.java @@ -0,0 +1,28 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.eql.planner; + +import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.xpack.eql.EqlClientException; +import org.elasticsearch.xpack.eql.common.Failure; + +import java.util.Collection; + +public class PlanningException extends EqlClientException { + + public PlanningException(String message, Object... args) { + super(message, args); + } + + protected PlanningException(Collection sources) { + super(Failure.failMessage(sources)); + } + + @Override + public RestStatus status() { + return RestStatus.BAD_REQUEST; + } +} diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/planner/QueryFolder.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/planner/QueryFolder.java new file mode 100644 index 00000000000..c43b71ac9e8 --- /dev/null +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/planner/QueryFolder.java @@ -0,0 +1,24 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.eql.planner; + +import org.elasticsearch.xpack.eql.plan.physical.PhysicalPlan; +import org.elasticsearch.xpack.ql.rule.RuleExecutor; + +import static java.util.Collections.emptyList; + +class QueryFolder extends RuleExecutor { + + PhysicalPlan fold(PhysicalPlan plan) { + return execute(plan); + } + + @Override + protected Iterable.Batch> batches() { + return emptyList(); + } +} diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/planner/Verifier.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/planner/Verifier.java new file mode 100644 index 00000000000..de4a68f28cf --- /dev/null +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/planner/Verifier.java @@ -0,0 +1,52 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.eql.planner; + +import org.elasticsearch.xpack.eql.common.Failure; +import org.elasticsearch.xpack.eql.plan.physical.PhysicalPlan; +import org.elasticsearch.xpack.eql.plan.physical.Unexecutable; +import org.elasticsearch.xpack.eql.plan.physical.UnplannedExec; + +import java.util.ArrayList; +import java.util.List; + +import static org.elasticsearch.xpack.eql.common.Failure.fail; + +abstract class Verifier { + + static List verifyMappingPlan(PhysicalPlan plan) { + List failures = new ArrayList<>(); + + plan.forEachUp(p -> { + if (p instanceof UnplannedExec) { + failures.add(fail(p, "Unplanned item")); + } + p.forEachExpressionsUp(e -> { + if (e.childrenResolved() && !e.resolved()) { + failures.add(fail(e, "Unresolved expression")); + } + }); + }); + return failures; + } + + static List verifyExecutingPlan(PhysicalPlan plan) { + List failures = new ArrayList<>(); + + plan.forEachUp(p -> { + if (p instanceof Unexecutable) { + failures.add(fail(p, "Unexecutable item")); + } + p.forEachExpressionsUp(e -> { + if (e.childrenResolved() && !e.resolved()) { + failures.add(fail(e, "Unresolved expression")); + } + }); + }); + + return failures; + } +} diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/querydsl/container/QueryContainer.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/querydsl/container/QueryContainer.java new file mode 100644 index 00000000000..05a54cfd86e --- /dev/null +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/querydsl/container/QueryContainer.java @@ -0,0 +1,13 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.eql.querydsl.container; + +public class QueryContainer { + + public QueryContainer withFrozen() { + throw new UnsupportedOperationException(); + } +} diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/session/EmptyExecutable.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/session/EmptyExecutable.java new file mode 100644 index 00000000000..e32cf3c34c2 --- /dev/null +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/session/EmptyExecutable.java @@ -0,0 +1,55 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.eql.session; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.xpack.ql.expression.Attribute; + +import java.util.List; +import java.util.Objects; + +public class EmptyExecutable implements Executable { + + private final List output; + + public EmptyExecutable(List output) { + this.output = output; + } + + @Override + public List output() { + return output; + } + + @Override + public void execute(EqlSession session, ActionListener listener) { + listener.onResponse(Results.EMPTY); + } + + @Override + public int hashCode() { + return output.hashCode(); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + + if (obj == null || getClass() != obj.getClass()) { + return false; + } + + EmptyExecutable other = (EmptyExecutable) obj; + return Objects.equals(output, other.output); + } + + @Override + public String toString() { + return output.toString(); + } +} diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/session/Results.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/session/Results.java index b0277e3b793..53c2859313a 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/session/Results.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/session/Results.java @@ -7,13 +7,17 @@ package org.elasticsearch.xpack.eql.session; import org.apache.lucene.search.TotalHits; +import org.apache.lucene.search.TotalHits.Relation; import java.util.List; +import static java.util.Collections.emptyList; + public class Results { - private final TotalHits totalHits; + public static final Results EMPTY = new Results(new TotalHits(0, Relation.EQUAL_TO), emptyList()); + private final TotalHits totalHits; private final List results; public Results(TotalHits totalHits, List results) { diff --git a/x-pack/plugin/eql/src/test/java/org/elasticsearch/xpack/eql/analysis/VerifierTests.java b/x-pack/plugin/eql/src/test/java/org/elasticsearch/xpack/eql/analysis/VerifierTests.java index 64f5328b5d1..ac339db8f1e 100644 --- a/x-pack/plugin/eql/src/test/java/org/elasticsearch/xpack/eql/analysis/VerifierTests.java +++ b/x-pack/plugin/eql/src/test/java/org/elasticsearch/xpack/eql/analysis/VerifierTests.java @@ -38,7 +38,7 @@ public class VerifierTests extends ESTestCase { private String error(IndexResolution resolution, String eql) { VerificationException e = expectThrows(VerificationException.class, () -> accept(resolution, eql)); assertTrue(e.getMessage().startsWith("Found ")); - String header = "Found 1 problem(s)\nline "; + String header = "Found 1 problem\nline "; return e.getMessage().substring(header.length()); } diff --git a/x-pack/plugin/eql/src/test/java/org/elasticsearch/xpack/eql/planner/QueryFolderTests.java b/x-pack/plugin/eql/src/test/java/org/elasticsearch/xpack/eql/planner/QueryFolderTests.java new file mode 100644 index 00000000000..147cabbfdb5 --- /dev/null +++ b/x-pack/plugin/eql/src/test/java/org/elasticsearch/xpack/eql/planner/QueryFolderTests.java @@ -0,0 +1,45 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.eql.planner; + +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xpack.eql.analysis.Analyzer; +import org.elasticsearch.xpack.eql.analysis.PreAnalyzer; +import org.elasticsearch.xpack.eql.analysis.Verifier; +import org.elasticsearch.xpack.eql.expression.function.EqlFunctionRegistry; +import org.elasticsearch.xpack.eql.optimizer.Optimizer; +import org.elasticsearch.xpack.eql.parser.EqlParser; +import org.elasticsearch.xpack.eql.plan.physical.PhysicalPlan; +import org.elasticsearch.xpack.ql.QlClientException; +import org.elasticsearch.xpack.ql.index.EsIndex; +import org.elasticsearch.xpack.ql.index.IndexResolution; + +import static org.elasticsearch.xpack.ql.type.TypesTests.loadMapping; + +public class QueryFolderTests extends ESTestCase { + + private EqlParser parser = new EqlParser(); + private PreAnalyzer preAnalyzer = new PreAnalyzer(); + private Analyzer analyzer = new Analyzer(new EqlFunctionRegistry(), new Verifier()); + private Optimizer optimizer = new Optimizer(); + private Planner planner = new Planner(); + + private IndexResolution index = IndexResolution.valid(new EsIndex("test", loadMapping("mapping-default.json"))); + + + private PhysicalPlan plan(IndexResolution resolution, String eql) { + return planner.plan(optimizer.optimize(analyzer.analyze(preAnalyzer.preAnalyze(parser.createStatement(eql), resolution)))); + } + + private PhysicalPlan plan(String eql) { + return plan(index, eql); + } + + public void testBasicPlan() throws Exception { + expectThrows(QlClientException.class, "not yet implemented", () -> plan("process where true")); + } +}