diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt index c16e757630c..e97c2f8150e 100644 --- a/solr/CHANGES.txt +++ b/solr/CHANGES.txt @@ -98,6 +98,7 @@ New Features * SOLR-8349: Allow sharing of large in memory data structures across cores (Gus Heck, noble) +* SOLR-9009: Adds ability to get an Explanation of a Streaming Expression (Dennis Gove) Bug Fixes ---------------------- diff --git a/solr/core/src/java/org/apache/solr/handler/SQLHandler.java b/solr/core/src/java/org/apache/solr/handler/SQLHandler.java index 28b033ac15d..aa400461f00 100644 --- a/solr/core/src/java/org/apache/solr/handler/SQLHandler.java +++ b/solr/core/src/java/org/apache/solr/handler/SQLHandler.java @@ -49,7 +49,10 @@ import org.apache.solr.client.solrj.io.stream.StreamContext; import org.apache.solr.client.solrj.io.stream.TupleStream; import org.apache.solr.client.solrj.io.stream.ExceptionStream; import org.apache.solr.client.solrj.io.stream.UniqueStream; +import org.apache.solr.client.solrj.io.stream.expr.Explanation; +import org.apache.solr.client.solrj.io.stream.expr.StreamExplanation; import org.apache.solr.client.solrj.io.stream.expr.StreamFactory; +import org.apache.solr.client.solrj.io.stream.expr.Explanation.ExpressionType; import org.apache.solr.client.solrj.io.stream.metrics.*; import org.apache.solr.common.SolrException; import org.apache.solr.common.cloud.ZkStateReader; @@ -1289,6 +1292,19 @@ public class SQLHandler extends RequestHandlerBase implements SolrCoreAware , Pe public void setStreamContext(StreamContext context) { stream.setStreamContext(context); } + + @Override + public Explanation toExplanation(StreamFactory factory) throws IOException { + + return new StreamExplanation(getStreamNodeId().toString()) + .withChildren(new Explanation[]{ + stream.toExplanation(factory) + }) + .withFunctionName("SQL LIMIT") + .withExpression("--non-expressible--") + .withImplementingClass(this.getClass().getName()) + .withExpressionType(ExpressionType.STREAM_DECORATOR); + } public Tuple read() throws IOException { ++count; @@ -1348,6 +1364,19 @@ public class SQLHandler extends RequestHandlerBase implements SolrCoreAware , Pe children.add(stream); return children; } + + @Override + public Explanation toExplanation(StreamFactory factory) throws IOException { + + return new StreamExplanation(getStreamNodeId().toString()) + .withChildren(new Explanation[]{ + stream.toExplanation(factory) + }) + .withFunctionName("SQL HAVING") + .withExpression("--non-expressible--") + .withImplementingClass(this.getClass().getName()) + .withExpressionType(ExpressionType.STREAM_DECORATOR); + } public void setStreamContext(StreamContext context) { stream.setStreamContext(context); @@ -1385,6 +1414,16 @@ public class SQLHandler extends RequestHandlerBase implements SolrCoreAware , Pe this.catalogs = new ArrayList<>(); this.catalogs.add(this.zkHost); } + + @Override + public Explanation toExplanation(StreamFactory factory) throws IOException { + + return new StreamExplanation(getStreamNodeId().toString()) + .withFunctionName("SQL CATALOG") + .withExpression("--non-expressible--") + .withImplementingClass(this.getClass().getName()) + .withExpressionType(ExpressionType.STREAM_DECORATOR); + } public Tuple read() throws IOException { Map fields = new HashMap<>(); @@ -1425,6 +1464,16 @@ public class SQLHandler extends RequestHandlerBase implements SolrCoreAware , Pe public void open() throws IOException { } + + @Override + public Explanation toExplanation(StreamFactory factory) throws IOException { + + return new StreamExplanation(getStreamNodeId().toString()) + .withFunctionName("SQL SCHEMA") + .withExpression("--non-expressible--") + .withImplementingClass(this.getClass().getName()) + .withExpressionType(ExpressionType.STREAM_DECORATOR); + } public Tuple read() throws IOException { Map fields = new HashMap<>(); @@ -1470,6 +1519,16 @@ public class SQLHandler extends RequestHandlerBase implements SolrCoreAware , Pe } Collections.sort(this.tables); } + + @Override + public Explanation toExplanation(StreamFactory factory) throws IOException { + + return new StreamExplanation(getStreamNodeId().toString()) + .withFunctionName("SQL TABLE") + .withExpression("--non-expressible--") + .withImplementingClass(this.getClass().getName()) + .withExpressionType(ExpressionType.STREAM_DECORATOR); + } public Tuple read() throws IOException { Map fields = new HashMap<>(); @@ -1517,6 +1576,19 @@ public class SQLHandler extends RequestHandlerBase implements SolrCoreAware , Pe public void open() throws IOException { this.stream.open(); } + + @Override + public Explanation toExplanation(StreamFactory factory) throws IOException { + + return new StreamExplanation(getStreamNodeId().toString()) + .withChildren(new Explanation[]{ + stream.toExplanation(factory) + }) + .withFunctionName("SQL METADATA") + .withExpression("--non-expressible--") + .withImplementingClass(this.getClass().getName()) + .withExpressionType(ExpressionType.STREAM_DECORATOR); + } // Return a metadata tuple as the first tuple and then pass through to the underlying stream. public Tuple read() throws IOException { diff --git a/solr/core/src/java/org/apache/solr/handler/StreamHandler.java b/solr/core/src/java/org/apache/solr/handler/StreamHandler.java index 7c47c768338..6922fc490df 100644 --- a/solr/core/src/java/org/apache/solr/handler/StreamHandler.java +++ b/solr/core/src/java/org/apache/solr/handler/StreamHandler.java @@ -35,8 +35,11 @@ import org.apache.solr.client.solrj.io.ops.DistinctOperation; import org.apache.solr.client.solrj.io.ops.GroupOperation; import org.apache.solr.client.solrj.io.ops.ReplaceOperation; import org.apache.solr.client.solrj.io.stream.*; +import org.apache.solr.client.solrj.io.stream.expr.Explanation; import org.apache.solr.client.solrj.io.stream.expr.Expressible; +import org.apache.solr.client.solrj.io.stream.expr.StreamExplanation; import org.apache.solr.client.solrj.io.stream.expr.StreamFactory; +import org.apache.solr.client.solrj.io.stream.expr.Explanation.ExpressionType; import org.apache.solr.client.solrj.io.stream.metrics.CountMetric; import org.apache.solr.client.solrj.io.stream.metrics.MaxMetric; import org.apache.solr.client.solrj.io.stream.metrics.MeanMetric; @@ -191,6 +194,12 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware, context.setSolrClientCache(clientCache); context.put("core", this.coreName); tupleStream.setStreamContext(context); + + // if asking for explanation then go get it + if(params.getBool("explain", false)){ + rsp.add("explanation", tupleStream.toExplanation(this.streamFactory)); + } + if(tupleStream instanceof DaemonStream) { DaemonStream daemonStream = (DaemonStream)tupleStream; if(daemons.containsKey(daemonStream.getId())) { @@ -271,6 +280,16 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware, public List children() { return null; } + + @Override + public Explanation toExplanation(StreamFactory factory) throws IOException { + + return new StreamExplanation(getStreamNodeId().toString()) + .withFunctionName("error") + .withImplementingClass(this.getClass().getName()) + .withExpressionType(ExpressionType.STREAM_DECORATOR) + .withExpression("--non-expressible--"); + } public Tuple read() { String msg = e.getMessage(); @@ -312,6 +331,16 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware, return null; } + @Override + public Explanation toExplanation(StreamFactory factory) throws IOException { + + return new StreamExplanation(getStreamNodeId().toString()) + .withFunctionName("daemon-collection") + .withImplementingClass(this.getClass().getName()) + .withExpressionType(ExpressionType.STREAM_DECORATOR) + .withExpression("--non-expressible--"); + } + public Tuple read() { if(it.hasNext()) { return it.next().getInfo(); @@ -347,6 +376,16 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware, return null; } + @Override + public Explanation toExplanation(StreamFactory factory) throws IOException { + + return new StreamExplanation(getStreamNodeId().toString()) + .withFunctionName("daemon-response") + .withImplementingClass(this.getClass().getName()) + .withExpressionType(ExpressionType.STREAM_DECORATOR) + .withExpression("--non-expressible--"); + } + public Tuple read() { if (sendEOF) { Map m = new HashMap(); @@ -391,6 +430,16 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware, return this.tupleStream.children(); } + @Override + public Explanation toExplanation(StreamFactory factory) throws IOException { + + return new StreamExplanation(getStreamNodeId().toString()) + .withFunctionName("timer") + .withImplementingClass(this.getClass().getName()) + .withExpressionType(ExpressionType.STREAM_DECORATOR) + .withExpression("--non-expressible--"); + } + public Tuple read() throws IOException { Tuple tuple = this.tupleStream.read(); if(tuple.EOF) { diff --git a/solr/core/src/java/org/apache/solr/response/TextResponseWriter.java b/solr/core/src/java/org/apache/solr/response/TextResponseWriter.java index bde5759957e..e1b1dee15ca 100644 --- a/solr/core/src/java/org/apache/solr/response/TextResponseWriter.java +++ b/solr/core/src/java/org/apache/solr/response/TextResponseWriter.java @@ -30,6 +30,7 @@ import org.apache.lucene.index.IndexableField; import org.apache.lucene.util.BytesRef; import org.apache.solr.client.solrj.io.Tuple; import org.apache.solr.client.solrj.io.stream.TupleStream; +import org.apache.solr.client.solrj.io.stream.expr.Explanation; import org.apache.solr.common.EnumFieldValue; import org.apache.solr.common.SolrDocument; import org.apache.solr.common.SolrDocumentList; @@ -165,6 +166,8 @@ public abstract class TextResponseWriter { writeNamedList(name, (NamedList)val); } else if (val instanceof TupleStream) { writeTupleStream((TupleStream) val); + } else if (val instanceof Explanation){ + writeExplanation((Explanation) val); } else if (val instanceof Path) { writeStr(name, ((Path) val).toAbsolutePath().toString(), true); } else if (val instanceof Iterable) { @@ -316,6 +319,10 @@ public abstract class TextResponseWriter { tupleStream.writeStreamClose(writer); tupleStream.close(); } + + public void writeExplanation(Explanation explanation) throws IOException { + writeMap(null, explanation.toMap(), false, true); + } /** if this form of the method is called, val is the Java string form of a double */ diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/comp/FieldComparator.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/comp/FieldComparator.java index 10c7c294e6e..15af57aef93 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/comp/FieldComparator.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/comp/FieldComparator.java @@ -16,9 +16,13 @@ */ package org.apache.solr.client.solrj.io.comp; +import java.io.IOException; import java.util.Map; +import java.util.UUID; import org.apache.solr.client.solrj.io.Tuple; +import org.apache.solr.client.solrj.io.stream.expr.Explanation; +import org.apache.solr.client.solrj.io.stream.expr.Explanation.ExpressionType; import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParameter; import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionValue; import org.apache.solr.client.solrj.io.stream.expr.StreamFactory; @@ -29,6 +33,7 @@ import org.apache.solr.client.solrj.io.stream.expr.StreamFactory; public class FieldComparator implements StreamComparator { private static final long serialVersionUID = 1; + private UUID comparatorNodeId = UUID.randomUUID(); private String leftFieldName; private String rightFieldName; @@ -84,6 +89,14 @@ public class FieldComparator implements StreamComparator { return new StreamExpressionValue(sb.toString()); } + + @Override + public Explanation toExplanation(StreamFactory factory) throws IOException { + return new Explanation(comparatorNodeId.toString()) + .withExpressionType(ExpressionType.SORTER) + .withImplementingClass(getClass().getName()) + .withExpression(toExpression(factory).toString()); + } /* * What're we doing here messing around with lambdas for the comparator logic? diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/comp/MultipleFieldComparator.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/comp/MultipleFieldComparator.java index 44edc25f4ed..137054033f0 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/comp/MultipleFieldComparator.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/comp/MultipleFieldComparator.java @@ -19,12 +19,15 @@ package org.apache.solr.client.solrj.io.comp; import java.io.IOException; import java.util.List; import java.util.Map; +import java.util.UUID; import org.apache.solr.client.solrj.io.Tuple; +import org.apache.solr.client.solrj.io.stream.expr.Explanation; import org.apache.solr.client.solrj.io.stream.expr.Expressible; import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParameter; import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionValue; import org.apache.solr.client.solrj.io.stream.expr.StreamFactory; +import org.apache.solr.client.solrj.io.stream.expr.Explanation.ExpressionType; /** @@ -34,6 +37,7 @@ import org.apache.solr.client.solrj.io.stream.expr.StreamFactory; public class MultipleFieldComparator implements StreamComparator { private static final long serialVersionUID = 1; + private UUID comparatorNodeId = UUID.randomUUID(); private StreamComparator[] comps; @@ -91,6 +95,14 @@ public class MultipleFieldComparator implements StreamComparator { return false; } + + @Override + public Explanation toExplanation(StreamFactory factory) throws IOException { + return new Explanation(comparatorNodeId.toString()) + .withExpressionType(ExpressionType.SORTER) + .withImplementingClass(getClass().getName()) + .withExpression(toExpression(factory).toString()); + } @Override public MultipleFieldComparator copyAliased(Map aliases){ diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/eq/FieldEqualitor.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/eq/FieldEqualitor.java index 50b8e229684..e7e207edb3d 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/eq/FieldEqualitor.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/eq/FieldEqualitor.java @@ -16,15 +16,15 @@ */ package org.apache.solr.client.solrj.io.eq; -import java.io.Serializable; -import java.util.Comparator; +import java.io.IOException; +import java.util.UUID; import org.apache.solr.client.solrj.io.Tuple; -import org.apache.solr.client.solrj.io.comp.ComparatorOrder; import org.apache.solr.client.solrj.io.comp.FieldComparator; import org.apache.solr.client.solrj.io.comp.MultipleFieldComparator; import org.apache.solr.client.solrj.io.comp.StreamComparator; -import org.apache.solr.client.solrj.io.stream.expr.Expressible; +import org.apache.solr.client.solrj.io.stream.expr.Explanation; +import org.apache.solr.client.solrj.io.stream.expr.Explanation.ExpressionType; import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParameter; import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionValue; import org.apache.solr.client.solrj.io.stream.expr.StreamFactory; @@ -35,6 +35,7 @@ import org.apache.solr.client.solrj.io.stream.expr.StreamFactory; public class FieldEqualitor implements StreamEqualitor { private static final long serialVersionUID = 1; + private UUID equalitorNodeId = UUID.randomUUID(); private String leftFieldName; private String rightFieldName; @@ -63,6 +64,14 @@ public class FieldEqualitor implements StreamEqualitor { return new StreamExpressionValue(sb.toString()); } + + @Override + public Explanation toExplanation(StreamFactory factory) throws IOException { + return new Explanation(equalitorNodeId.toString()) + .withExpressionType(ExpressionType.EQUALITOR) + .withImplementingClass(getClass().getName()) + .withExpression(toExpression(factory).toString()); + } public boolean test(Tuple leftTuple, Tuple rightTuple) { diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/eq/MultipleFieldEqualitor.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/eq/MultipleFieldEqualitor.java index 982018e446d..76c261a8c0f 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/eq/MultipleFieldEqualitor.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/eq/MultipleFieldEqualitor.java @@ -17,14 +17,17 @@ package org.apache.solr.client.solrj.io.eq; import java.io.IOException; +import java.util.UUID; import org.apache.solr.client.solrj.io.Tuple; import org.apache.solr.client.solrj.io.comp.MultipleFieldComparator; import org.apache.solr.client.solrj.io.comp.StreamComparator; +import org.apache.solr.client.solrj.io.stream.expr.Explanation; import org.apache.solr.client.solrj.io.stream.expr.Expressible; import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParameter; import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionValue; import org.apache.solr.client.solrj.io.stream.expr.StreamFactory; +import org.apache.solr.client.solrj.io.stream.expr.Explanation.ExpressionType; /** @@ -34,6 +37,7 @@ import org.apache.solr.client.solrj.io.stream.expr.StreamFactory; public class MultipleFieldEqualitor implements StreamEqualitor { private static final long serialVersionUID = 1; + private UUID equalitorNodeId = UUID.randomUUID(); private StreamEqualitor[] eqs; @@ -60,6 +64,14 @@ public class MultipleFieldEqualitor implements StreamEqualitor { return new StreamExpressionValue(sb.toString()); } + + @Override + public Explanation toExplanation(StreamFactory factory) throws IOException { + return new Explanation(equalitorNodeId.toString()) + .withExpressionType(ExpressionType.EQUALITOR) + .withImplementingClass(getClass().getName()) + .withExpression(toExpression(factory).toString()); + } @Override public boolean isDerivedFrom(StreamEqualitor base){ diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/graph/GatherNodesStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/graph/GatherNodesStream.java index 759aa0f8431..7ab6e973756 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/graph/GatherNodesStream.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/graph/GatherNodesStream.java @@ -29,6 +29,7 @@ import java.util.ArrayList; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; +import java.util.stream.Collectors; import org.apache.solr.client.solrj.io.eq.MultipleFieldEqualitor; import org.apache.solr.client.solrj.io.stream.*; @@ -36,12 +37,15 @@ import org.apache.solr.client.solrj.io.stream.metrics.*; import org.apache.solr.client.solrj.io.Tuple; import org.apache.solr.client.solrj.io.comp.StreamComparator; import org.apache.solr.client.solrj.io.eq.FieldEqualitor; +import org.apache.solr.client.solrj.io.stream.expr.Explanation; import org.apache.solr.client.solrj.io.stream.expr.Expressible; +import org.apache.solr.client.solrj.io.stream.expr.StreamExplanation; import org.apache.solr.client.solrj.io.stream.expr.StreamExpression; import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParameter; import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParameter; import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionValue; import org.apache.solr.client.solrj.io.stream.expr.StreamFactory; +import org.apache.solr.client.solrj.io.stream.expr.Explanation.ExpressionType; import org.apache.solr.common.util.ExecutorUtil; import org.apache.solr.common.util.SolrjNamedThreadFactory; @@ -50,7 +54,7 @@ public class GatherNodesStream extends TupleStream implements Expressible { private String zkHost; private String collection; private StreamContext streamContext; - private Map queryParams; + private Map queryParams; private String traverseFrom; private String traverseTo; private String gather; @@ -250,21 +254,30 @@ public class GatherNodesStream extends TupleStream implements Expressible { } @Override - public StreamExpressionParameter toExpression(StreamFactory factory) throws IOException { + public StreamExpression toExpression(StreamFactory factory) throws IOException{ + return toExpression(factory, true); + } + + private StreamExpression toExpression(StreamFactory factory, boolean includeStreams) throws IOException { StreamExpression expression = new StreamExpression(factory.getFunctionName(this.getClass())); // collection expression.addParameter(collection); - if(tupleStream instanceof Expressible){ - expression.addParameter(((Expressible)tupleStream).toExpression(factory)); + if(includeStreams){ + if(tupleStream instanceof Expressible){ + expression.addParameter(((Expressible)tupleStream).toExpression(factory)); + } + else{ + throw new IOException("This GatherNodesStream contains a non-expressible TupleStream - it cannot be converted to an expression"); + } } else{ - throw new IOException("This GatherNodesStream contains a non-expressible TupleStream - it cannot be converted to an expression"); + expression.addParameter(""); } - Set entries = queryParams.entrySet(); + Set> entries = queryParams.entrySet(); // parameters for(Map.Entry param : entries){ String value = param.getValue().toString(); @@ -300,6 +313,37 @@ public class GatherNodesStream extends TupleStream implements Expressible { return expression; } + + @Override + public Explanation toExplanation(StreamFactory factory) throws IOException { + + StreamExplanation explanation = new StreamExplanation(getStreamNodeId().toString()); + + explanation.setFunctionName(factory.getFunctionName(this.getClass())); + explanation.setImplementingClass(this.getClass().getName()); + explanation.setExpressionType(ExpressionType.GRAPH_SOURCE); + explanation.setExpression(toExpression(factory).toString()); + + // one child is a stream + explanation.addChild(tupleStream.toExplanation(factory)); + + // one child is a datastore so add it at this point + StreamExplanation child = new StreamExplanation(getStreamNodeId() + "-datastore"); + child.setFunctionName("solr (graph)"); + child.setImplementingClass("Solr/Lucene"); + child.setExpressionType(ExpressionType.DATASTORE); + child.setExpression(queryParams.entrySet().stream().map(e -> String.format(Locale.ROOT, "%s=%s", e.getKey(), e.getValue())).collect(Collectors.joining(","))); + explanation.addChild(child); + + if(null != metrics){ + for(Metric metric : metrics){ + explanation.addHelper(metric.toExplanation(factory)); + } + } + + return explanation; + } + public void setStreamContext(StreamContext context) { this.traversal = (Traversal) context.get("traversal"); @@ -576,5 +620,15 @@ public class GatherNodesStream extends TupleStream implements Expressible { return new Tuple(map); } } + + @Override + public Explanation toExplanation(StreamFactory factory) throws IOException { + + return new StreamExplanation(getStreamNodeId().toString()) + .withFunctionName("non-expressible") + .withImplementingClass(this.getClass().getName()) + .withExpressionType(ExpressionType.STREAM_SOURCE) + .withExpression("non-expressible"); + } } } \ No newline at end of file diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/graph/ShortestPathStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/graph/ShortestPathStream.java index bb9b09df3e8..7418e0f2523 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/graph/ShortestPathStream.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/graph/ShortestPathStream.java @@ -30,18 +30,22 @@ import java.util.ArrayList; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; +import java.util.stream.Collectors; import org.apache.solr.client.solrj.io.eq.MultipleFieldEqualitor; import org.apache.solr.client.solrj.io.stream.*; import org.apache.solr.client.solrj.io.Tuple; import org.apache.solr.client.solrj.io.comp.StreamComparator; import org.apache.solr.client.solrj.io.eq.FieldEqualitor; +import org.apache.solr.client.solrj.io.stream.expr.Explanation; import org.apache.solr.client.solrj.io.stream.expr.Expressible; +import org.apache.solr.client.solrj.io.stream.expr.StreamExplanation; import org.apache.solr.client.solrj.io.stream.expr.StreamExpression; import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParameter; import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParameter; import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionValue; import org.apache.solr.client.solrj.io.stream.expr.StreamFactory; +import org.apache.solr.client.solrj.io.stream.expr.Explanation.ExpressionType; import org.apache.solr.common.util.ExecutorUtil; import org.apache.solr.common.util.SolrjNamedThreadFactory; @@ -61,7 +65,7 @@ public class ShortestPathStream extends TupleStream implements Expressible { private boolean found; private StreamContext streamContext; private int threads; - private Map queryParams; + private Map queryParams; public ShortestPathStream(String zkHost, String collection, @@ -221,7 +225,7 @@ public class ShortestPathStream extends TupleStream implements Expressible { // collection expression.addParameter(collection); - Set entries = queryParams.entrySet(); + Set> entries = queryParams.entrySet(); // parameters for(Map.Entry param : entries){ String value = param.getValue().toString(); @@ -243,6 +247,27 @@ public class ShortestPathStream extends TupleStream implements Expressible { expression.addParameter(new StreamExpressionNamedParameter("edge", fromField+"="+toField)); return expression; } + + @Override + public Explanation toExplanation(StreamFactory factory) throws IOException { + + StreamExplanation explanation = new StreamExplanation(getStreamNodeId().toString()); + + explanation.setFunctionName(factory.getFunctionName(this.getClass())); + explanation.setImplementingClass(this.getClass().getName()); + explanation.setExpressionType(ExpressionType.GRAPH_SOURCE); + explanation.setExpression(toExpression(factory).toString()); + + // child is a datastore so add it at this point + StreamExplanation child = new StreamExplanation(getStreamNodeId() + "-datastore"); + child.setFunctionName("solr (graph)"); + child.setImplementingClass("Solr/Lucene"); + child.setExpressionType(ExpressionType.DATASTORE); + child.setExpression(queryParams.entrySet().stream().map(e -> String.format(Locale.ROOT, "%s=%s", e.getKey(), e.getValue())).collect(Collectors.joining(","))); + explanation.addChild(child); + + return explanation; + } public void setStreamContext(StreamContext context) { this.streamContext = context; diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/ops/ConcatOperation.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/ops/ConcatOperation.java index a31be43094d..19d5ef58ce1 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/ops/ConcatOperation.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/ops/ConcatOperation.java @@ -18,8 +18,11 @@ package org.apache.solr.client.solrj.io.ops; import java.io.IOException; import java.util.Locale; +import java.util.UUID; import org.apache.solr.client.solrj.io.Tuple; +import org.apache.solr.client.solrj.io.stream.expr.Explanation; +import org.apache.solr.client.solrj.io.stream.expr.Explanation.ExpressionType; import org.apache.solr.client.solrj.io.stream.expr.StreamExpression; import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParameter; import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParameter; @@ -33,7 +36,8 @@ import org.apache.solr.client.solrj.io.stream.expr.StreamFactory; public class ConcatOperation implements StreamOperation { private static final long serialVersionUID = 1; - + private UUID operationNodeId = UUID.randomUUID(); + private String[] fields; private String as; private String delim; @@ -94,4 +98,13 @@ public class ConcatOperation implements StreamOperation { return expression; } + @Override + public Explanation toExplanation(StreamFactory factory) throws IOException { + return new Explanation(operationNodeId.toString()) + .withExpressionType(ExpressionType.OPERATION) + .withFunctionName(factory.getFunctionName(getClass())) + .withImplementingClass(getClass().getName()) + .withExpression(toExpression(factory).toString()); + } + } diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/ops/DistinctOperation.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/ops/DistinctOperation.java index 682fe3b9c39..f4af7b61e10 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/ops/DistinctOperation.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/ops/DistinctOperation.java @@ -16,32 +16,20 @@ */ package org.apache.solr.client.solrj.io.ops; -import org.apache.solr.client.solrj.io.Tuple; -import org.apache.solr.client.solrj.io.comp.FieldComparator; -import org.apache.solr.client.solrj.io.comp.StreamComparator; -import org.apache.solr.client.solrj.io.eq.FieldEqualitor; -import org.apache.solr.client.solrj.io.eq.StreamEqualitor; -import org.apache.solr.client.solrj.io.stream.expr.Expressible; -import org.apache.solr.client.solrj.io.stream.expr.StreamExpression; -import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParameter; -import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParameter; -import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionValue; -import org.apache.solr.client.solrj.io.stream.expr.StreamFactory; - import java.io.IOException; -import java.io.Serializable; -import java.util.Comparator; -import java.util.List; -import java.util.ArrayList; -import java.util.Locale; -import java.util.Map; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.PriorityQueue; +import java.util.UUID; + +import org.apache.solr.client.solrj.io.Tuple; +import org.apache.solr.client.solrj.io.stream.expr.Explanation; +import org.apache.solr.client.solrj.io.stream.expr.Explanation.ExpressionType; +import org.apache.solr.client.solrj.io.stream.expr.StreamExpression; +import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParameter; +import org.apache.solr.client.solrj.io.stream.expr.StreamFactory; public class DistinctOperation implements ReduceOperation { private static final long serialVersionUID = 1L; + private UUID operationNodeId = UUID.randomUUID(); private Tuple current; public DistinctOperation(StreamExpression expression, StreamFactory factory) throws IOException { @@ -60,6 +48,15 @@ public class DistinctOperation implements ReduceOperation { return expression; } + @Override + public Explanation toExplanation(StreamFactory factory) throws IOException { + return new Explanation(operationNodeId.toString()) + .withExpressionType(ExpressionType.OPERATION) + .withFunctionName(factory.getFunctionName(getClass())) + .withImplementingClass(getClass().getName()) + .withExpression(toExpression(factory).toString()); + } + public Tuple reduce() { // Return the tuple after setting current to null. This will ensure the next call to // operate stores that tuple diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/ops/GroupOperation.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/ops/GroupOperation.java index 1687352374b..9ed5cbe29be 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/ops/GroupOperation.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/ops/GroupOperation.java @@ -16,29 +16,34 @@ */ package org.apache.solr.client.solrj.io.ops; +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.PriorityQueue; +import java.util.UUID; + import org.apache.solr.client.solrj.io.Tuple; import org.apache.solr.client.solrj.io.comp.FieldComparator; import org.apache.solr.client.solrj.io.comp.StreamComparator; -import org.apache.solr.client.solrj.io.stream.expr.Expressible; +import org.apache.solr.client.solrj.io.stream.expr.Explanation; +import org.apache.solr.client.solrj.io.stream.expr.Explanation.ExpressionType; import org.apache.solr.client.solrj.io.stream.expr.StreamExpression; import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParameter; import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParameter; import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionValue; import org.apache.solr.client.solrj.io.stream.expr.StreamFactory; -import java.io.IOException; -import java.io.Serializable; -import java.util.Comparator; -import java.util.List; -import java.util.ArrayList; -import java.util.Locale; -import java.util.Map; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.PriorityQueue; - public class GroupOperation implements ReduceOperation { + private static final long serialVersionUID = 1L; + private UUID operationNodeId = UUID.randomUUID(); + private PriorityQueue priorityQueue; private Comparator comp; private StreamComparator streamComparator; @@ -86,6 +91,18 @@ public class GroupOperation implements ReduceOperation { return expression; } + @Override + public Explanation toExplanation(StreamFactory factory) throws IOException { + return new Explanation(operationNodeId.toString()) + .withExpressionType(ExpressionType.OPERATION) + .withFunctionName(factory.getFunctionName(getClass())) + .withImplementingClass(getClass().getName()) + .withExpression(toExpression(factory).toString()) + .withHelpers(new Explanation[]{ + streamComparator.toExplanation(factory) + }); + } + public Tuple reduce() { Map map = new HashMap(); List list = new ArrayList(); @@ -122,6 +139,7 @@ public class GroupOperation implements ReduceOperation { } public int compare(Tuple t1, Tuple t2) { + // Couldn't this be comp.compare(t2,t1) ? return comp.compare(t1, t2)*(-1); } } diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/ops/ReplaceOperation.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/ops/ReplaceOperation.java index e569c0df7dc..27342dda264 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/ops/ReplaceOperation.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/ops/ReplaceOperation.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.util.Locale; import org.apache.solr.client.solrj.io.Tuple; +import org.apache.solr.client.solrj.io.stream.expr.Explanation; import org.apache.solr.client.solrj.io.stream.expr.StreamExpression; import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParameter; import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParameter; @@ -80,4 +81,9 @@ public class ReplaceOperation implements StreamOperation { return replacer.toExpression(factory); } + @Override + public Explanation toExplanation(StreamFactory factory) throws IOException{ + return replacer.toExplanation(factory); + } + } diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/ops/ReplaceWithFieldOperation.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/ops/ReplaceWithFieldOperation.java index 0dcdeceba02..99b808e437a 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/ops/ReplaceWithFieldOperation.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/ops/ReplaceWithFieldOperation.java @@ -18,13 +18,16 @@ package org.apache.solr.client.solrj.io.ops; import java.io.IOException; import java.util.Locale; +import java.util.UUID; import org.apache.solr.client.solrj.io.Tuple; +import org.apache.solr.client.solrj.io.stream.expr.Explanation; import org.apache.solr.client.solrj.io.stream.expr.StreamExpression; import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParameter; import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParameter; import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionValue; import org.apache.solr.client.solrj.io.stream.expr.StreamFactory; +import org.apache.solr.client.solrj.io.stream.expr.Explanation.ExpressionType; /** * Implementation of replace(...., withField=fieldName) @@ -33,6 +36,7 @@ import org.apache.solr.client.solrj.io.stream.expr.StreamFactory; public class ReplaceWithFieldOperation implements StreamOperation { private static final long serialVersionUID = 1; + private UUID operationNodeId = UUID.randomUUID(); private boolean wasBuiltWithFieldName; private String originalFieldName; @@ -108,4 +112,12 @@ public class ReplaceWithFieldOperation implements StreamOperation { return expression; } + @Override + public Explanation toExplanation(StreamFactory factory) throws IOException { + return new Explanation(operationNodeId.toString()) + .withExpressionType(ExpressionType.OPERATION) + .withFunctionName(factory.getFunctionName(getClass())) + .withImplementingClass(getClass().getName()) + .withExpression(toExpression(factory).toString()); + } } diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/ops/ReplaceWithValueOperation.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/ops/ReplaceWithValueOperation.java index 8b74e2a0c28..0301defc0b4 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/ops/ReplaceWithValueOperation.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/ops/ReplaceWithValueOperation.java @@ -18,13 +18,16 @@ package org.apache.solr.client.solrj.io.ops; import java.io.IOException; import java.util.Locale; +import java.util.UUID; import org.apache.solr.client.solrj.io.Tuple; +import org.apache.solr.client.solrj.io.stream.expr.Explanation; import org.apache.solr.client.solrj.io.stream.expr.StreamExpression; import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParameter; import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParameter; import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionValue; import org.apache.solr.client.solrj.io.stream.expr.StreamFactory; +import org.apache.solr.client.solrj.io.stream.expr.Explanation.ExpressionType; /** * Implementation of replace(...., withValue="some value") @@ -33,6 +36,7 @@ import org.apache.solr.client.solrj.io.stream.expr.StreamFactory; public class ReplaceWithValueOperation implements StreamOperation { private static final long serialVersionUID = 1; + private UUID operationNodeId = UUID.randomUUID(); private boolean wasBuiltWithFieldName; private String fieldName; @@ -113,4 +117,12 @@ public class ReplaceWithValueOperation implements StreamOperation { return expression; } + @Override + public Explanation toExplanation(StreamFactory factory) throws IOException { + return new Explanation(operationNodeId.toString()) + .withExpressionType(ExpressionType.OPERATION) + .withFunctionName(factory.getFunctionName(getClass())) + .withImplementingClass(getClass().getName()) + .withExpression(toExpression(factory).toString()); + } } diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/ops/StreamOperation.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/ops/StreamOperation.java index 643628fa95e..1f71901638a 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/ops/StreamOperation.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/ops/StreamOperation.java @@ -16,13 +16,10 @@ */ package org.apache.solr.client.solrj.io.ops; -import java.io.IOException; import java.io.Serializable; import org.apache.solr.client.solrj.io.Tuple; import org.apache.solr.client.solrj.io.stream.expr.Expressible; -import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParameter; -import org.apache.solr.client.solrj.io.stream.expr.StreamFactory; /** * Interface for any operation one can perform on a tuple in a TupleStream diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CloudSolrStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CloudSolrStream.java index c9c7b22e5ec..b619f2facfd 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CloudSolrStream.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CloudSolrStream.java @@ -26,12 +26,13 @@ import java.util.List; import java.util.Locale; import java.util.Map; import java.util.Map.Entry; -import java.util.Set; import java.util.Random; +import java.util.Set; import java.util.TreeSet; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; +import java.util.stream.Collectors; import org.apache.solr.client.solrj.impl.CloudSolrClient; import org.apache.solr.client.solrj.impl.CloudSolrClient.Builder; @@ -41,10 +42,12 @@ import org.apache.solr.client.solrj.io.comp.ComparatorOrder; import org.apache.solr.client.solrj.io.comp.FieldComparator; import org.apache.solr.client.solrj.io.comp.MultipleFieldComparator; import org.apache.solr.client.solrj.io.comp.StreamComparator; +import org.apache.solr.client.solrj.io.stream.expr.Explanation; +import org.apache.solr.client.solrj.io.stream.expr.Explanation.ExpressionType; import org.apache.solr.client.solrj.io.stream.expr.Expressible; +import org.apache.solr.client.solrj.io.stream.expr.StreamExplanation; import org.apache.solr.client.solrj.io.stream.expr.StreamExpression; import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParameter; -import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParameter; import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionValue; import org.apache.solr.client.solrj.io.stream.expr.StreamFactory; import org.apache.solr.common.cloud.ClusterState; @@ -154,7 +157,7 @@ public class CloudSolrStream extends TupleStream implements Expressible { } @Override - public StreamExpressionParameter toExpression(StreamFactory factory) throws IOException { + public StreamExpression toExpression(StreamFactory factory) throws IOException { // functionName(collectionName, param1, param2, ..., paramN, sort="comp", [aliases="field=alias,..."]) // function name @@ -194,6 +197,29 @@ public class CloudSolrStream extends TupleStream implements Expressible { return expression; } + @Override + public Explanation toExplanation(StreamFactory factory) throws IOException { + + StreamExplanation explanation = new StreamExplanation(getStreamNodeId().toString()); + + explanation.setFunctionName(factory.getFunctionName(this.getClass())); + explanation.setImplementingClass(this.getClass().getName()); + explanation.setExpressionType(ExpressionType.STREAM_SOURCE); + explanation.setExpression(toExpression(factory).toString()); + + // child is a datastore so add it at this point + StreamExplanation child = new StreamExplanation(getStreamNodeId() + "-datastore"); + child.setFunctionName(String.format(Locale.ROOT, "solr (%s)", collection)); + child.setImplementingClass("Solr/Lucene"); + child.setExpressionType(ExpressionType.DATASTORE); + if(null != params){ + child.setExpression(params.entrySet().stream().map(e -> String.format(Locale.ROOT, "%s=%s", e.getKey(), e.getValue())).collect(Collectors.joining(","))); + } + explanation.addChild(child); + + return explanation; + } + private void init(String collectionName, String zkHost, Map params) throws IOException { this.zkHost = zkHost; this.collection = collectionName; diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ComplementStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ComplementStream.java index bd3c2336d6d..a47f05ce07c 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ComplementStream.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ComplementStream.java @@ -22,12 +22,13 @@ import java.util.List; import java.util.Locale; import org.apache.solr.client.solrj.io.Tuple; -import org.apache.solr.client.solrj.io.comp.FieldComparator; import org.apache.solr.client.solrj.io.comp.StreamComparator; import org.apache.solr.client.solrj.io.eq.FieldEqualitor; import org.apache.solr.client.solrj.io.eq.StreamEqualitor; -import org.apache.solr.client.solrj.io.ops.DistinctOperation; +import org.apache.solr.client.solrj.io.stream.expr.Explanation; +import org.apache.solr.client.solrj.io.stream.expr.Explanation.ExpressionType; import org.apache.solr.client.solrj.io.stream.expr.Expressible; +import org.apache.solr.client.solrj.io.stream.expr.StreamExplanation; import org.apache.solr.client.solrj.io.stream.expr.StreamExpression; import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParameter; import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionValue; @@ -88,23 +89,33 @@ public class ComplementStream extends TupleStream implements Expressible { } @Override - public StreamExpression toExpression(StreamFactory factory) throws IOException { + public StreamExpression toExpression(StreamFactory factory) throws IOException{ + return toExpression(factory, true); + } + + private StreamExpression toExpression(StreamFactory factory, boolean includeStreams) throws IOException { // function name StreamExpression expression = new StreamExpression(factory.getFunctionName(this.getClass())); - // streams - if(streamA instanceof Expressible){ - expression.addParameter(((Expressible)streamA).toExpression(factory)); + if(includeStreams){ + // streams + if(streamA instanceof Expressible){ + expression.addParameter(((Expressible)streamA).toExpression(factory)); + } + else{ + throw new IOException("This IntersectionStream contains a non-expressible TupleStream - it cannot be converted to an expression"); + } + + if(originalStreamB instanceof Expressible){ + expression.addParameter(((Expressible)originalStreamB).toExpression(factory)); + } + else{ + throw new IOException("This IntersectStream contains a non-expressible TupleStream - it cannot be converted to an expression"); + } } else{ - throw new IOException("This IntersectionStream contains a non-expressible TupleStream - it cannot be converted to an expression"); - } - - if(originalStreamB instanceof Expressible){ - expression.addParameter(((Expressible)originalStreamB).toExpression(factory)); - } - else{ - throw new IOException("This IntersectStream contains a non-expressible TupleStream - it cannot be converted to an expression"); + expression.addParameter(""); + expression.addParameter(""); } // on @@ -112,6 +123,22 @@ public class ComplementStream extends TupleStream implements Expressible { return expression; } + + @Override + public Explanation toExplanation(StreamFactory factory) throws IOException { + + return new StreamExplanation(getStreamNodeId().toString()) + .withChildren(new Explanation[]{ + streamA.toExplanation(factory), + originalStreamB.toExplanation(factory) + }) + .withFunctionName(factory.getFunctionName(this.getClass())) + .withImplementingClass(this.getClass().getName()) + .withExpressionType(ExpressionType.STREAM_DECORATOR) + .withExpression(toExpression(factory, false).toString()) + .withHelper(eq.toExplanation(factory)) + ; + } public void setStreamContext(StreamContext context) { this.streamA.setStreamContext(context); @@ -119,7 +146,7 @@ public class ComplementStream extends TupleStream implements Expressible { } public List children() { - List l = new ArrayList(); + List l = new ArrayList(); l.add(streamA); l.add(streamB); return l; diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/DaemonStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/DaemonStream.java index 752ea7ce1d1..77648dfd738 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/DaemonStream.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/DaemonStream.java @@ -15,23 +15,26 @@ * limitations under the License. */ package org.apache.solr.client.solrj.io.stream; +import java.io.IOException; +import java.lang.invoke.MethodHandles; +import java.util.ArrayList; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.concurrent.ArrayBlockingQueue; + import org.apache.solr.client.solrj.io.Tuple; import org.apache.solr.client.solrj.io.comp.StreamComparator; +import org.apache.solr.client.solrj.io.stream.expr.Explanation; +import org.apache.solr.client.solrj.io.stream.expr.Explanation.ExpressionType; import org.apache.solr.client.solrj.io.stream.expr.Expressible; +import org.apache.solr.client.solrj.io.stream.expr.StreamExplanation; import org.apache.solr.client.solrj.io.stream.expr.StreamExpression; import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParameter; import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionValue; import org.apache.solr.client.solrj.io.stream.expr.StreamFactory; - -import java.io.IOException; -import java.util.ArrayList; -import java.lang.invoke.MethodHandles; -import java.util.Locale; -import java.util.Map; -import java.util.HashMap; -import java.util.Date; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.List; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -100,15 +103,24 @@ public class DaemonStream extends TupleStream implements Expressible { } @Override - public StreamExpression toExpression(StreamFactory factory) throws IOException { + public StreamExpression toExpression(StreamFactory factory) throws IOException{ + return toExpression(factory, true); + } + + private StreamExpression toExpression(StreamFactory factory, boolean includeStreams) throws IOException { // function name StreamExpression expression = new StreamExpression(factory.getFunctionName(this.getClass())); - // streams - if(tupleStream instanceof Expressible){ - expression.addParameter(((Expressible)tupleStream).toExpression(factory)); - } else { - throw new IOException("This UniqueStream contains a non-expressible TupleStream - it cannot be converted to an expression"); + if(includeStreams){ + // streams + if(tupleStream instanceof Expressible){ + expression.addParameter(((Expressible)tupleStream).toExpression(factory)); + } else { + throw new IOException("This UniqueStream contains a non-expressible TupleStream - it cannot be converted to an expression"); + } + } + else{ + expression.addParameter(""); } expression.addParameter(new StreamExpressionNamedParameter("id", id)); @@ -117,6 +129,19 @@ public class DaemonStream extends TupleStream implements Expressible { return expression; } + + @Override + public Explanation toExplanation(StreamFactory factory) throws IOException { + + return new StreamExplanation(getStreamNodeId().toString()) + .withChildren(new Explanation[] { + tupleStream.toExplanation(factory) + }) + .withFunctionName(factory.getFunctionName(this.getClass())) + .withImplementingClass(this.getClass().getName()) + .withExpressionType(ExpressionType.STREAM_DECORATOR) + .withExpression(toExpression(factory, false).toString()); + } public int remainingCapacity() { return this.queue.remainingCapacity(); diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ExceptionStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ExceptionStream.java index e442d1b564c..46041058f56 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ExceptionStream.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ExceptionStream.java @@ -24,6 +24,10 @@ import java.util.Map; import org.apache.solr.client.solrj.io.Tuple; import org.apache.solr.client.solrj.io.comp.StreamComparator; +import org.apache.solr.client.solrj.io.stream.expr.Explanation; +import org.apache.solr.client.solrj.io.stream.expr.Explanation.ExpressionType; +import org.apache.solr.client.solrj.io.stream.expr.StreamExplanation; +import org.apache.solr.client.solrj.io.stream.expr.StreamFactory; import org.apache.solr.common.SolrException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -70,6 +74,16 @@ public class ExceptionStream extends TupleStream { return new Tuple(fields); } } + + @Override + public Explanation toExplanation(StreamFactory factory) throws IOException { + + return new StreamExplanation(getStreamNodeId().toString()) + .withFunctionName("non-expressible") + .withImplementingClass(this.getClass().getName()) + .withExpressionType(ExpressionType.STREAM_SOURCE) + .withExpression("non-expressible"); + } public StreamComparator getStreamSort() { return this.stream.getStreamSort(); diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/FacetStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/FacetStream.java index 3474257efc1..ceaf13ccf61 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/FacetStream.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/FacetStream.java @@ -17,13 +17,14 @@ package org.apache.solr.client.solrj.io.stream; import java.io.IOException; -import java.util.HashMap; import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Locale; import java.util.Map; -import java.util.Collections; import java.util.Map.Entry; +import java.util.stream.Collectors; import org.apache.solr.client.solrj.impl.CloudSolrClient; import org.apache.solr.client.solrj.impl.CloudSolrClient.Builder; @@ -33,7 +34,10 @@ import org.apache.solr.client.solrj.io.comp.ComparatorOrder; import org.apache.solr.client.solrj.io.comp.FieldComparator; import org.apache.solr.client.solrj.io.comp.MultipleFieldComparator; import org.apache.solr.client.solrj.io.comp.StreamComparator; +import org.apache.solr.client.solrj.io.stream.expr.Explanation; +import org.apache.solr.client.solrj.io.stream.expr.Explanation.ExpressionType; import org.apache.solr.client.solrj.io.stream.expr.Expressible; +import org.apache.solr.client.solrj.io.stream.expr.StreamExplanation; import org.apache.solr.client.solrj.io.stream.expr.StreamExpression; import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParameter; import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParameter; @@ -58,7 +62,7 @@ public class FacetStream extends TupleStream implements Expressible { private Metric[] metrics; private int bucketSizeLimit; private FieldComparator[] bucketSorts; - private List tuples = new ArrayList(); + private List tuples = new ArrayList(); private int index; private String zkHost; private Map props; @@ -267,7 +271,31 @@ public class FacetStream extends TupleStream implements Expressible { return expression; } + @Override + public Explanation toExplanation(StreamFactory factory) throws IOException { + StreamExplanation explanation = new StreamExplanation(getStreamNodeId().toString()); + + explanation.setFunctionName(factory.getFunctionName(this.getClass())); + explanation.setImplementingClass(this.getClass().getName()); + explanation.setExpressionType(ExpressionType.STREAM_SOURCE); + explanation.setExpression(toExpression(factory).toString()); + + // child is a datastore so add it at this point + StreamExplanation child = new StreamExplanation(getStreamNodeId() + "-datastore"); + child.setFunctionName(String.format(Locale.ROOT, "solr (%s)", collection)); + // TODO: fix this so we know the # of workers - check with Joel about a Topic's ability to be in a + // parallel stream. + + child.setImplementingClass("Solr/Lucene"); + child.setExpressionType(ExpressionType.DATASTORE); + child.setExpression(props.entrySet().stream().map(e -> String.format(Locale.ROOT, "%s=%s", e.getKey(), e.getValue())).collect(Collectors.joining(","))); + + explanation.addChild(child); + + return explanation; + } + public void setStreamContext(StreamContext context) { cache = context.getSolrClientCache(); } diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/HashJoinStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/HashJoinStream.java index 717e8e5d1a3..dfb678f63fc 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/HashJoinStream.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/HashJoinStream.java @@ -24,7 +24,10 @@ import java.util.Locale; import org.apache.solr.client.solrj.io.Tuple; import org.apache.solr.client.solrj.io.comp.StreamComparator; +import org.apache.solr.client.solrj.io.stream.expr.Explanation; +import org.apache.solr.client.solrj.io.stream.expr.Explanation.ExpressionType; import org.apache.solr.client.solrj.io.stream.expr.Expressible; +import org.apache.solr.client.solrj.io.stream.expr.StreamExplanation; import org.apache.solr.client.solrj.io.stream.expr.StreamExpression; import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParameter; import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionValue; @@ -99,17 +102,27 @@ public class HashJoinStream extends TupleStream implements Expressible { } @Override - public StreamExpression toExpression(StreamFactory factory) throws IOException { + public StreamExpression toExpression(StreamFactory factory) throws IOException{ + return toExpression(factory, true); + } + + private StreamExpression toExpression(StreamFactory factory, boolean includeStreams) throws IOException { // function name StreamExpression expression = new StreamExpression(factory.getFunctionName(this.getClass())); - // streams - if(hashStream instanceof Expressible && fullStream instanceof Expressible){ - expression.addParameter(((Expressible)fullStream).toExpression(factory)); - expression.addParameter(new StreamExpressionNamedParameter("hashed", ((Expressible)hashStream).toExpression(factory))); + if(includeStreams){ + // streams + if(hashStream instanceof Expressible && fullStream instanceof Expressible){ + expression.addParameter(((Expressible)fullStream).toExpression(factory)); + expression.addParameter(new StreamExpressionNamedParameter("hashed", ((Expressible)hashStream).toExpression(factory))); + } + else{ + throw new IOException("This HashJoinStream contains a non-expressible TupleStream - it cannot be converted to an expression"); + } } else{ - throw new IOException("This HashJoinStream contains a non-expressible TupleStream - it cannot be converted to an expression"); + expression.addParameter(""); + expression.addParameter("hashed="); } // on @@ -122,6 +135,20 @@ public class HashJoinStream extends TupleStream implements Expressible { return expression; } + + @Override + public Explanation toExplanation(StreamFactory factory) throws IOException { + + return new StreamExplanation(getStreamNodeId().toString()) + .withChildren(new Explanation[]{ + fullStream.toExplanation(factory), + hashStream.toExplanation(factory) + }) + .withFunctionName(factory.getFunctionName(this.getClass())) + .withImplementingClass(this.getClass().getName()) + .withExpressionType(ExpressionType.STREAM_DECORATOR) + .withExpression(toExpression(factory, false).toString()); + } public void setStreamContext(StreamContext context) { this.hashStream.setStreamContext(context); @@ -129,7 +156,7 @@ public class HashJoinStream extends TupleStream implements Expressible { } public List children() { - List l = new ArrayList(); + List l = new ArrayList(); l.add(hashStream); l.add(fullStream); return l; diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/IntersectStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/IntersectStream.java index 273ca60dd8d..3c04e466386 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/IntersectStream.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/IntersectStream.java @@ -22,12 +22,13 @@ import java.util.List; import java.util.Locale; import org.apache.solr.client.solrj.io.Tuple; -import org.apache.solr.client.solrj.io.comp.FieldComparator; import org.apache.solr.client.solrj.io.comp.StreamComparator; import org.apache.solr.client.solrj.io.eq.FieldEqualitor; import org.apache.solr.client.solrj.io.eq.StreamEqualitor; -import org.apache.solr.client.solrj.io.ops.DistinctOperation; +import org.apache.solr.client.solrj.io.stream.expr.Explanation; +import org.apache.solr.client.solrj.io.stream.expr.Explanation.ExpressionType; import org.apache.solr.client.solrj.io.stream.expr.Expressible; +import org.apache.solr.client.solrj.io.stream.expr.StreamExplanation; import org.apache.solr.client.solrj.io.stream.expr.StreamExpression; import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParameter; import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionValue; @@ -88,23 +89,33 @@ public class IntersectStream extends TupleStream implements Expressible { } @Override - public StreamExpression toExpression(StreamFactory factory) throws IOException { + public StreamExpression toExpression(StreamFactory factory) throws IOException{ + return toExpression(factory, true); + } + + private StreamExpression toExpression(StreamFactory factory, boolean includeStreams) throws IOException { // function name StreamExpression expression = new StreamExpression(factory.getFunctionName(this.getClass())); - // streams - if(streamA instanceof Expressible){ - expression.addParameter(((Expressible)streamA).toExpression(factory)); + if(includeStreams){ + // streams + if(streamA instanceof Expressible){ + expression.addParameter(((Expressible)streamA).toExpression(factory)); + } + else{ + throw new IOException("This IntersectStream contains a non-expressible TupleStream - it cannot be converted to an expression"); + } + + if(originalStreamB instanceof Expressible){ + expression.addParameter(((Expressible)originalStreamB).toExpression(factory)); + } + else{ + throw new IOException("This IntersectStream contains a non-expressible TupleStream - it cannot be converted to an expression"); + } } else{ - throw new IOException("This IntersectStream contains a non-expressible TupleStream - it cannot be converted to an expression"); - } - - if(originalStreamB instanceof Expressible){ - expression.addParameter(((Expressible)originalStreamB).toExpression(factory)); - } - else{ - throw new IOException("This IntersectStream contains a non-expressible TupleStream - it cannot be converted to an expression"); + expression.addParameter(""); + expression.addParameter(""); } // on @@ -113,13 +124,28 @@ public class IntersectStream extends TupleStream implements Expressible { return expression; } + @Override + public Explanation toExplanation(StreamFactory factory) throws IOException { + + return new StreamExplanation(getStreamNodeId().toString()) + .withChildren(new Explanation[]{ + streamA.toExplanation(factory), + originalStreamB.toExplanation(factory) + }) + .withFunctionName(factory.getFunctionName(this.getClass())) + .withImplementingClass(this.getClass().getName()) + .withExpressionType(ExpressionType.STREAM_DECORATOR) + .withExpression(toExpression(factory, false).toString()) + .withHelper(eq.toExplanation(factory)); + } + public void setStreamContext(StreamContext context) { this.streamA.setStreamContext(context); this.streamB.setStreamContext(context); } public List children() { - List l = new ArrayList(); + List l = new ArrayList(); l.add(streamA); l.add(streamB); return l; diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/JDBCStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/JDBCStream.java index a98f2e42425..d1a301e1276 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/JDBCStream.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/JDBCStream.java @@ -34,7 +34,10 @@ import java.util.Properties; import org.apache.solr.client.solrj.io.Tuple; import org.apache.solr.client.solrj.io.comp.FieldComparator; import org.apache.solr.client.solrj.io.comp.StreamComparator; +import org.apache.solr.client.solrj.io.stream.expr.Explanation; +import org.apache.solr.client.solrj.io.stream.expr.Explanation.ExpressionType; import org.apache.solr.client.solrj.io.stream.expr.Expressible; +import org.apache.solr.client.solrj.io.stream.expr.StreamExplanation; import org.apache.solr.client.solrj.io.stream.expr.StreamExpression; import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParameter; import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParameter; @@ -348,6 +351,40 @@ public class JDBCStream extends TupleStream implements Expressible { return expression; } + @Override + public Explanation toExplanation(StreamFactory factory) throws IOException { + + StreamExplanation explanation = new StreamExplanation(getStreamNodeId().toString()); + + explanation.setFunctionName(factory.getFunctionName(this.getClass())); + explanation.setImplementingClass(this.getClass().getName()); + explanation.setExpressionType(ExpressionType.STREAM_SOURCE); + + StreamExpression expression = (StreamExpression)toExpression(factory); + explanation.setExpression(expression.toString()); + + String driverClassName = this.driverClassName; + if(null == driverClassName){ + try{ + driverClassName = DriverManager.getDriver(connectionUrl).getClass().getName(); + } + catch(Exception e){ + driverClassName = String.format(Locale.ROOT, "Failed to find driver for connectionUrl='%s'", connectionUrl); + } + } + + // child is a datastore so add it at this point + StreamExplanation child = new StreamExplanation(getStreamNodeId() + "-datastore"); + child.setFunctionName("jdbc-source"); + child.setImplementingClass(driverClassName); + child.setExpressionType(ExpressionType.DATASTORE); + child.setExpression(sqlQuery); + + explanation.addChild(child); + + return explanation; + } + @Override public List children() { return new ArrayList(); diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/JoinStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/JoinStream.java index 69df46304f7..acb7fe99a3a 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/JoinStream.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/JoinStream.java @@ -26,7 +26,10 @@ import org.apache.solr.client.solrj.io.Tuple; import org.apache.solr.client.solrj.io.comp.StreamComparator; import org.apache.solr.client.solrj.io.eq.FieldEqualitor; import org.apache.solr.client.solrj.io.eq.StreamEqualitor; +import org.apache.solr.client.solrj.io.stream.expr.Explanation; +import org.apache.solr.client.solrj.io.stream.expr.Explanation.ExpressionType; import org.apache.solr.client.solrj.io.stream.expr.Expressible; +import org.apache.solr.client.solrj.io.stream.expr.StreamExplanation; import org.apache.solr.client.solrj.io.stream.expr.StreamExpression; import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParameter; import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionValue; @@ -87,13 +90,22 @@ public abstract class JoinStream extends TupleStream implements Expressible { } @Override - public StreamExpression toExpression(StreamFactory factory) throws IOException { + public StreamExpression toExpression(StreamFactory factory) throws IOException{ + return toExpression(factory, true); + } + + private StreamExpression toExpression(StreamFactory factory, boolean includeStreams) throws IOException { // function name StreamExpression expression = new StreamExpression(factory.getFunctionName(this.getClass())); // streams for (PushBackStream stream : streams) { - expression.addParameter(stream.toExpression(factory)); + if(includeStreams){ + expression.addParameter(stream.toExpression(factory)); + } + else{ + expression.addParameter(""); + } } // on @@ -107,6 +119,23 @@ public abstract class JoinStream extends TupleStream implements Expressible { return expression; } + @Override + public Explanation toExplanation(StreamFactory factory) throws IOException { + + StreamExplanation explanation = new StreamExplanation(getStreamNodeId().toString()); + explanation.setFunctionName(factory.getFunctionName(this.getClass())); + explanation.setImplementingClass(this.getClass().getName()); + explanation.setExpressionType(ExpressionType.STREAM_DECORATOR); + explanation.setExpression(toExpression(factory, false).toString()); + explanation.addHelper(eq.toExplanation(factory)); + + for(TupleStream stream : streams){ + explanation.addChild(stream.toExplanation(factory)); + } + + return explanation; + } + public void setStreamContext(StreamContext context) { for (PushBackStream stream : streams) { stream.setStreamContext(context); diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/MergeStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/MergeStream.java index 2ea926cbd58..c50ac779a48 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/MergeStream.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/MergeStream.java @@ -24,7 +24,10 @@ import java.util.Locale; import org.apache.solr.client.solrj.io.Tuple; import org.apache.solr.client.solrj.io.comp.FieldComparator; import org.apache.solr.client.solrj.io.comp.StreamComparator; +import org.apache.solr.client.solrj.io.stream.expr.Explanation; +import org.apache.solr.client.solrj.io.stream.expr.Explanation.ExpressionType; import org.apache.solr.client.solrj.io.stream.expr.Expressible; +import org.apache.solr.client.solrj.io.stream.expr.StreamExplanation; import org.apache.solr.client.solrj.io.stream.expr.StreamExpression; import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParameter; import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionValue; @@ -95,13 +98,22 @@ public class MergeStream extends TupleStream implements Expressible { } @Override - public StreamExpression toExpression(StreamFactory factory) throws IOException { + public StreamExpression toExpression(StreamFactory factory) throws IOException{ + return toExpression(factory, true); + } + + private StreamExpression toExpression(StreamFactory factory, boolean includeStreams) throws IOException { // function name StreamExpression expression = new StreamExpression(factory.getFunctionName(this.getClass())); // streams for(PushBackStream stream : streams){ - expression.addParameter(stream.toExpression(factory)); + if(includeStreams){ + expression.addParameter(stream.toExpression(factory)); + } + else{ + expression.addParameter(""); + } } // on @@ -109,6 +121,23 @@ public class MergeStream extends TupleStream implements Expressible { return expression; } + + @Override + public Explanation toExplanation(StreamFactory factory) throws IOException { + + StreamExplanation explanation = new StreamExplanation(getStreamNodeId().toString()); + explanation.setFunctionName(factory.getFunctionName(this.getClass())); + explanation.setImplementingClass(this.getClass().getName()); + explanation.setExpressionType(ExpressionType.STREAM_DECORATOR); + explanation.setExpression(toExpression(factory, false).toString()); + explanation.addHelper(comp.toExplanation(factory)); + + for(PushBackStream stream : streams){ + explanation.addChild(stream.toExplanation(factory)); + } + + return explanation; + } public void setStreamContext(StreamContext context) { for(PushBackStream stream : streams){ @@ -117,7 +146,7 @@ public class MergeStream extends TupleStream implements Expressible { } public List children() { - List l = new ArrayList(); + List l = new ArrayList(); for(PushBackStream stream : streams){ l.add(stream); } diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ParallelStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ParallelStream.java index 28b1c6e7f05..957064300d2 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ParallelStream.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ParallelStream.java @@ -30,16 +30,20 @@ import java.util.List; import java.util.Locale; import java.util.Map; import java.util.Map.Entry; +import java.util.stream.Collectors; import java.util.Random; import org.apache.solr.client.solrj.io.Tuple; import org.apache.solr.client.solrj.io.comp.FieldComparator; import org.apache.solr.client.solrj.io.comp.StreamComparator; +import org.apache.solr.client.solrj.io.stream.expr.Explanation; import org.apache.solr.client.solrj.io.stream.expr.Expressible; +import org.apache.solr.client.solrj.io.stream.expr.StreamExplanation; import org.apache.solr.client.solrj.io.stream.expr.StreamExpression; import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParameter; import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionValue; import org.apache.solr.client.solrj.io.stream.expr.StreamFactory; +import org.apache.solr.client.solrj.io.stream.expr.Explanation.ExpressionType; import org.apache.solr.common.cloud.ClusterState; import org.apache.solr.common.cloud.Replica; import org.apache.solr.common.cloud.Slice; @@ -163,8 +167,11 @@ public class ParallelStream extends CloudSolrStream implements Expressible { } @Override - public StreamExpression toExpression(StreamFactory factory) throws IOException { - + public StreamExpression toExpression(StreamFactory factory) throws IOException{ + return toExpression(factory, true); + } + + private StreamExpression toExpression(StreamFactory factory, boolean includeStreams) throws IOException { // function name StreamExpression expression = new StreamExpression(factory.getFunctionName(this.getClass())); @@ -174,12 +181,16 @@ public class ParallelStream extends CloudSolrStream implements Expressible { // workers expression.addParameter(new StreamExpressionNamedParameter("workers", Integer.toString(workers))); - // stream - if(tupleStream instanceof Expressible){ - expression.addParameter(((Expressible)tupleStream).toExpression(factory)); + if(includeStreams){ + if(tupleStream instanceof Expressible){ + expression.addParameter(((Expressible)tupleStream).toExpression(factory)); + } + else{ + throw new IOException("This ParallelStream contains a non-expressible TupleStream - it cannot be converted to an expression"); + } } else{ - throw new IOException("This ParallelStream contains a non-expressible TupleStream - it cannot be converted to an expression"); + expression.addParameter(""); } // sort @@ -191,6 +202,24 @@ public class ParallelStream extends CloudSolrStream implements Expressible { return expression; } + @Override + public Explanation toExplanation(StreamFactory factory) throws IOException { + + StreamExplanation explanation = new StreamExplanation(getStreamNodeId().toString()); + + explanation.setFunctionName(factory.getFunctionName(this.getClass())); + explanation.setImplementingClass(this.getClass().getName()); + explanation.setExpressionType(ExpressionType.STREAM_DECORATOR); + explanation.setExpression(toExpression(factory, false).toString()); + + // add a child for each worker + for(int idx = 0; idx < workers; ++idx){ + explanation.addChild(tupleStream.toExplanation(factory)); + } + + return explanation; + } + public List children() { List l = new ArrayList(); l.add(tupleStream); diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/PushBackStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/PushBackStream.java index 527da21a046..b2b92a8a9ea 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/PushBackStream.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/PushBackStream.java @@ -22,6 +22,7 @@ import java.util.List; import org.apache.solr.client.solrj.io.Tuple; import org.apache.solr.client.solrj.io.comp.StreamComparator; +import org.apache.solr.client.solrj.io.stream.expr.Explanation; import org.apache.solr.client.solrj.io.stream.expr.Expressible; import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParameter; import org.apache.solr.client.solrj.io.stream.expr.StreamFactory; @@ -51,12 +52,16 @@ public class PushBackStream extends TupleStream implements Expressible { throw new IOException("This PushBackStream contains a non-expressible TupleStream - it cannot be converted to an expression"); } + public Explanation toExplanation(StreamFactory factory) throws IOException{ + return stream.toExplanation(factory); + } + public void setStreamContext(StreamContext context) { this.stream.setStreamContext(context); } public List children() { - List l = new ArrayList(); + List l = new ArrayList(); l.add(stream); return l; } diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/RandomStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/RandomStream.java index 3eb35c18618..246f09ecbb9 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/RandomStream.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/RandomStream.java @@ -24,6 +24,7 @@ import java.util.List; import java.util.Locale; import java.util.Map; import java.util.Map.Entry; +import java.util.stream.Collectors; import java.util.Random; import java.util.Iterator; @@ -31,12 +32,15 @@ import org.apache.solr.client.solrj.impl.CloudSolrClient; import org.apache.solr.client.solrj.io.SolrClientCache; import org.apache.solr.client.solrj.io.Tuple; import org.apache.solr.client.solrj.io.comp.StreamComparator; +import org.apache.solr.client.solrj.io.stream.expr.Explanation; import org.apache.solr.client.solrj.io.stream.expr.Expressible; +import org.apache.solr.client.solrj.io.stream.expr.StreamExplanation; import org.apache.solr.client.solrj.io.stream.expr.StreamExpression; import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParameter; import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParameter; import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionValue; import org.apache.solr.client.solrj.io.stream.expr.StreamFactory; +import org.apache.solr.client.solrj.io.stream.expr.Explanation.ExpressionType; import org.apache.solr.client.solrj.request.QueryRequest; import org.apache.solr.client.solrj.response.QueryResponse; import org.apache.solr.common.SolrDocument; @@ -132,6 +136,28 @@ public class RandomStream extends TupleStream implements Expressible { return expression; } + @Override + public Explanation toExplanation(StreamFactory factory) throws IOException { + + StreamExplanation explanation = new StreamExplanation(getStreamNodeId().toString()); + + explanation.setFunctionName(factory.getFunctionName(this.getClass())); + explanation.setImplementingClass(this.getClass().getName()); + explanation.setExpressionType(ExpressionType.STREAM_SOURCE); + explanation.setExpression(toExpression(factory).toString()); + + // child is a datastore so add it at this point + StreamExplanation child = new StreamExplanation(getStreamNodeId() + "-datastore"); + child.setFunctionName(String.format(Locale.ROOT, "solr (%s)", collection)); + child.setImplementingClass("Solr/Lucene"); + child.setExpressionType(ExpressionType.DATASTORE); + if(null != props){ + child.setExpression(props.entrySet().stream().map(e -> String.format(Locale.ROOT, "%s=%s", e.getKey(), e.getValue())).collect(Collectors.joining(","))); + } + explanation.addChild(child); + + return explanation; + } public void setStreamContext(StreamContext context) { cache = context.getSolrClientCache(); diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/RankStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/RankStream.java index 5a8b2187f01..aaef8497252 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/RankStream.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/RankStream.java @@ -28,7 +28,10 @@ import java.util.PriorityQueue; import org.apache.solr.client.solrj.io.Tuple; import org.apache.solr.client.solrj.io.comp.FieldComparator; import org.apache.solr.client.solrj.io.comp.StreamComparator; +import org.apache.solr.client.solrj.io.stream.expr.Explanation; +import org.apache.solr.client.solrj.io.stream.expr.Explanation.ExpressionType; import org.apache.solr.client.solrj.io.stream.expr.Expressible; +import org.apache.solr.client.solrj.io.stream.expr.StreamExplanation; import org.apache.solr.client.solrj.io.stream.expr.StreamExpression; import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParameter; import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionValue; @@ -101,19 +104,28 @@ public class RankStream extends TupleStream implements Expressible { } @Override - public StreamExpression toExpression(StreamFactory factory) throws IOException { + public StreamExpression toExpression(StreamFactory factory) throws IOException{ + return toExpression(factory, true); + } + + private StreamExpression toExpression(StreamFactory factory, boolean includeStreams) throws IOException { // function name StreamExpression expression = new StreamExpression(factory.getFunctionName(this.getClass())); // n expression.addParameter(new StreamExpressionNamedParameter("n", Integer.toString(size))); - // stream - if(stream instanceof Expressible){ - expression.addParameter(((Expressible)stream).toExpression(factory)); + if(includeStreams){ + // stream + if(stream instanceof Expressible){ + expression.addParameter(((Expressible)stream).toExpression(factory)); + } + else{ + throw new IOException("This RankStream contains a non-expressible TupleStream - it cannot be converted to an expression"); + } } else{ - throw new IOException("This RankStream contains a non-expressible TupleStream - it cannot be converted to an expression"); + expression.addParameter(""); } // sort @@ -122,19 +134,33 @@ public class RankStream extends TupleStream implements Expressible { return expression; } + @Override + public Explanation toExplanation(StreamFactory factory) throws IOException { + + return new StreamExplanation(getStreamNodeId().toString()) + .withChildren(new Explanation[]{ + stream.toExplanation(factory) + }) + .withFunctionName(factory.getFunctionName(this.getClass())) + .withImplementingClass(this.getClass().getName()) + .withExpressionType(ExpressionType.STREAM_DECORATOR) + .withExpression(toExpression(factory, false).toString()) + .withHelper(comp.toExplanation(factory)); + } + public void setStreamContext(StreamContext context) { this.stream.setStreamContext(context); } public List children() { - List l = new ArrayList(); + List l = new ArrayList(); l.add(stream); return l; } public void open() throws IOException { - this.top = new PriorityQueue(size, new ReverseComp(comp)); - this.topList = new LinkedList(); + this.top = new PriorityQueue(size, new ReverseComp(comp)); + this.topList = new LinkedList(); stream.open(); } @@ -187,6 +213,7 @@ public class RankStream extends TupleStream implements Expressible { class ReverseComp implements Comparator, Serializable { + private static final long serialVersionUID = 1L; private StreamComparator comp; public ReverseComp(StreamComparator comp) { diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ReducerStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ReducerStream.java index 930c01dd421..69375c2ffae 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ReducerStream.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ReducerStream.java @@ -18,10 +18,8 @@ package org.apache.solr.client.solrj.io.stream; import java.io.IOException; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; import java.util.Locale; -import java.util.Map; import org.apache.solr.client.solrj.io.Tuple; import org.apache.solr.client.solrj.io.comp.FieldComparator; @@ -32,7 +30,10 @@ import org.apache.solr.client.solrj.io.eq.MultipleFieldEqualitor; import org.apache.solr.client.solrj.io.eq.StreamEqualitor; import org.apache.solr.client.solrj.io.ops.ReduceOperation; import org.apache.solr.client.solrj.io.ops.StreamOperation; +import org.apache.solr.client.solrj.io.stream.expr.Explanation; +import org.apache.solr.client.solrj.io.stream.expr.Explanation.ExpressionType; import org.apache.solr.client.solrj.io.stream.expr.Expressible; +import org.apache.solr.client.solrj.io.stream.expr.StreamExplanation; import org.apache.solr.client.solrj.io.stream.expr.StreamExpression; import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParameter; import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionValue; @@ -134,12 +135,21 @@ public class ReducerStream extends TupleStream implements Expressible { } @Override - public StreamExpression toExpression(StreamFactory factory) throws IOException { + public StreamExpression toExpression(StreamFactory factory) throws IOException{ + return toExpression(factory, true); + } + + private StreamExpression toExpression(StreamFactory factory, boolean includeStreams) throws IOException { // function name StreamExpression expression = new StreamExpression(factory.getFunctionName(this.getClass())); // stream - expression.addParameter(stream.toExpression(factory)); + if(includeStreams){ + expression.addParameter(stream.toExpression(factory)); + } + else{ + expression.addParameter(""); + } // over if(eq instanceof Expressible){ @@ -158,12 +168,29 @@ public class ReducerStream extends TupleStream implements Expressible { return expression; } + @Override + public Explanation toExplanation(StreamFactory factory) throws IOException { + + return new StreamExplanation(getStreamNodeId().toString()) + .withChildren(new Explanation[]{ + stream.toExplanation(factory) + }) + .withFunctionName(factory.getFunctionName(this.getClass())) + .withImplementingClass(this.getClass().getName()) + .withExpressionType(ExpressionType.STREAM_DECORATOR) + .withExpression(toExpression(factory, false).toString()) + .withHelpers(new Explanation[]{ + eq.toExplanation(factory), + op.toExplanation(factory) + }); + } + public void setStreamContext(StreamContext context) { this.stream.setStreamContext(context); } public List children() { - List l = new ArrayList(); + List l = new ArrayList(); l.add(stream); return l; } diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/RollupStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/RollupStream.java index 106651c5f88..c43f847807e 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/RollupStream.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/RollupStream.java @@ -17,23 +17,24 @@ package org.apache.solr.client.solrj.io.stream; import java.io.IOException; +import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Locale; import java.util.Map; -import java.util.ArrayList; import org.apache.solr.client.solrj.io.Tuple; -import org.apache.solr.client.solrj.io.comp.FieldComparator; import org.apache.solr.client.solrj.io.comp.HashKey; import org.apache.solr.client.solrj.io.comp.StreamComparator; import org.apache.solr.client.solrj.io.eq.FieldEqualitor; import org.apache.solr.client.solrj.io.eq.MultipleFieldEqualitor; import org.apache.solr.client.solrj.io.eq.StreamEqualitor; +import org.apache.solr.client.solrj.io.stream.expr.Explanation; +import org.apache.solr.client.solrj.io.stream.expr.Explanation.ExpressionType; import org.apache.solr.client.solrj.io.stream.expr.Expressible; +import org.apache.solr.client.solrj.io.stream.expr.StreamExplanation; import org.apache.solr.client.solrj.io.stream.expr.StreamExpression; import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParameter; -import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParameter; import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionValue; import org.apache.solr.client.solrj.io.stream.expr.StreamFactory; import org.apache.solr.client.solrj.io.stream.metrics.Bucket; @@ -121,12 +122,21 @@ public class RollupStream extends TupleStream implements Expressible { } @Override - public StreamExpressionParameter toExpression(StreamFactory factory) throws IOException { + public StreamExpression toExpression(StreamFactory factory) throws IOException{ + return toExpression(factory, true); + } + + private StreamExpression toExpression(StreamFactory factory, boolean includeStreams) throws IOException { // function name StreamExpression expression = new StreamExpression(factory.getFunctionName(this.getClass())); // stream - expression.addParameter(tupleStream.toExpression(factory)); + if(includeStreams){ + expression.addParameter(tupleStream.toExpression(factory)); + } + else{ + expression.addParameter(""); + } // over StringBuilder overBuilder = new StringBuilder(); @@ -143,6 +153,25 @@ public class RollupStream extends TupleStream implements Expressible { return expression; } + + @Override + public Explanation toExplanation(StreamFactory factory) throws IOException { + + Explanation explanation = new StreamExplanation(getStreamNodeId().toString()) + .withChildren(new Explanation[]{ + tupleStream.toExplanation(factory) + }) + .withFunctionName(factory.getFunctionName(this.getClass())) + .withImplementingClass(this.getClass().getName()) + .withExpressionType(ExpressionType.STREAM_DECORATOR) + .withExpression(toExpression(factory, false).toString()); + + for(Metric metric : metrics){ + explanation.withHelper(metric.toExplanation(factory)); + } + + return explanation; + } public void setStreamContext(StreamContext context) { this.tupleStream.setStreamContext(context); @@ -176,7 +205,7 @@ public class RollupStream extends TupleStream implements Expressible { return tuple; } - Map map = new HashMap(); + Map map = new HashMap(); for(Metric metric : currentMetrics) { map.put(metric.getIdentifier(), metric.getValue()); } @@ -207,7 +236,7 @@ public class RollupStream extends TupleStream implements Expressible { } else { Tuple t = null; if(currentMetrics != null) { - Map map = new HashMap(); + Map map = new HashMap(); for(Metric metric : currentMetrics) { map.put(metric.getIdentifier(), metric.getValue()); } diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SelectStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SelectStream.java index ff95f354d66..b0a1e05b3f0 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SelectStream.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SelectStream.java @@ -26,7 +26,10 @@ import java.util.Map; import org.apache.solr.client.solrj.io.Tuple; import org.apache.solr.client.solrj.io.comp.StreamComparator; import org.apache.solr.client.solrj.io.ops.StreamOperation; +import org.apache.solr.client.solrj.io.stream.expr.Explanation; +import org.apache.solr.client.solrj.io.stream.expr.Explanation.ExpressionType; import org.apache.solr.client.solrj.io.stream.expr.Expressible; +import org.apache.solr.client.solrj.io.stream.expr.StreamExplanation; import org.apache.solr.client.solrj.io.stream.expr.StreamExpression; import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParameter; import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionValue; @@ -110,16 +113,25 @@ public class SelectStream extends TupleStream implements Expressible { } @Override - public StreamExpression toExpression(StreamFactory factory) throws IOException { + public StreamExpression toExpression(StreamFactory factory) throws IOException{ + return toExpression(factory, true); + } + + private StreamExpression toExpression(StreamFactory factory, boolean includeStreams) throws IOException { // function name StreamExpression expression = new StreamExpression(factory.getFunctionName(this.getClass())); - // stream - if(stream instanceof Expressible){ - expression.addParameter(((Expressible)stream).toExpression(factory)); + if(includeStreams){ + // stream + if(stream instanceof Expressible){ + expression.addParameter(((Expressible)stream).toExpression(factory)); + } + else{ + throw new IOException("This SelectStream contains a non-expressible TupleStream - it cannot be converted to an expression"); + } } else{ - throw new IOException("This SelectStream contains a non-expressible TupleStream - it cannot be converted to an expression"); + expression.addParameter(""); } // selects @@ -138,6 +150,25 @@ public class SelectStream extends TupleStream implements Expressible { return expression; } + + @Override + public Explanation toExplanation(StreamFactory factory) throws IOException { + + Explanation explanation = new StreamExplanation(getStreamNodeId().toString()) + .withChildren(new Explanation[]{ + stream.toExplanation(factory) + }) + .withFunctionName(factory.getFunctionName(this.getClass())) + .withImplementingClass(this.getClass().getName()) + .withExpressionType(ExpressionType.STREAM_DECORATOR) + .withExpression(toExpression(factory, false).toString()); + + for(StreamOperation operation : operations){ + explanation.addHelper(operation.toExplanation(factory)); + } + + return explanation; + } public void setStreamContext(StreamContext context) { this.stream.setStreamContext(context); diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SolrStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SolrStream.java index 7ff8a4e0810..007c6442240 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SolrStream.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SolrStream.java @@ -19,15 +19,19 @@ package org.apache.solr.client.solrj.io.stream; import java.io.IOException; import java.lang.invoke.MethodHandles; import java.util.ArrayList; -import java.util.Map; import java.util.HashMap; -import java.util.List; import java.util.Iterator; +import java.util.List; +import java.util.Map; import org.apache.solr.client.solrj.impl.HttpSolrClient; import org.apache.solr.client.solrj.io.SolrClientCache; import org.apache.solr.client.solrj.io.Tuple; import org.apache.solr.client.solrj.io.comp.StreamComparator; +import org.apache.solr.client.solrj.io.stream.expr.Explanation; +import org.apache.solr.client.solrj.io.stream.expr.Explanation.ExpressionType; +import org.apache.solr.client.solrj.io.stream.expr.StreamExplanation; +import org.apache.solr.client.solrj.io.stream.expr.StreamFactory; import org.apache.solr.common.params.ModifiableSolrParams; import org.apache.solr.common.params.SolrParams; import org.slf4j.Logger; @@ -149,6 +153,16 @@ public class SolrStream extends TupleStream { return buf.toString(); } + @Override + public Explanation toExplanation(StreamFactory factory) throws IOException { + + return new StreamExplanation(getStreamNodeId().toString()) + .withFunctionName("non-expressible") + .withImplementingClass(this.getClass().getName()) + .withExpressionType(ExpressionType.STREAM_SOURCE) + .withExpression("non-expressible"); + } + /** * Closes the Stream to a single Solr Instance * */ diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SortStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SortStream.java index d9a8526059f..0410107ee16 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SortStream.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SortStream.java @@ -25,11 +25,14 @@ import java.util.Locale; import org.apache.solr.client.solrj.io.Tuple; import org.apache.solr.client.solrj.io.comp.FieldComparator; import org.apache.solr.client.solrj.io.comp.StreamComparator; +import org.apache.solr.client.solrj.io.stream.expr.Explanation; import org.apache.solr.client.solrj.io.stream.expr.Expressible; +import org.apache.solr.client.solrj.io.stream.expr.StreamExplanation; import org.apache.solr.client.solrj.io.stream.expr.StreamExpression; import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParameter; import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionValue; import org.apache.solr.client.solrj.io.stream.expr.StreamFactory; +import org.apache.solr.client.solrj.io.stream.expr.Explanation.ExpressionType; /** @@ -106,16 +109,25 @@ public class SortStream extends TupleStream implements Expressible { } @Override - public StreamExpression toExpression(StreamFactory factory) throws IOException { + public StreamExpression toExpression(StreamFactory factory) throws IOException{ + return toExpression(factory, true); + } + + private StreamExpression toExpression(StreamFactory factory, boolean includeStreams) throws IOException { // function name StreamExpression expression = new StreamExpression(factory.getFunctionName(this.getClass())); - // streams - if(stream instanceof Expressible){ - expression.addParameter(((Expressible)stream).toExpression(factory)); + if(includeStreams){ + // streams + if(stream instanceof Expressible){ + expression.addParameter(((Expressible)stream).toExpression(factory)); + } + else{ + throw new IOException("This SortStream contains a non-expressible TupleStream - it cannot be converted to an expression"); + } } else{ - throw new IOException("This SortStream contains a non-expressible TupleStream - it cannot be converted to an expression"); + expression.addParameter(""); } // by @@ -128,6 +140,20 @@ public class SortStream extends TupleStream implements Expressible { return expression; } + + @Override + public Explanation toExplanation(StreamFactory factory) throws IOException { + + return new StreamExplanation(getStreamNodeId().toString()) + .withChildren(new Explanation[]{ + stream.toExplanation(factory) + }) + .withFunctionName(factory.getFunctionName(this.getClass())) + .withImplementingClass(this.getClass().getName()) + .withExpressionType(ExpressionType.STREAM_DECORATOR) + .withExpression(toExpression(factory, false).toString()) + .withHelper(comparator.toExplanation(factory)); + } public void setStreamContext(StreamContext context) { this.stream.setStreamContext(context); diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/StatsStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/StatsStream.java index 6ef49eaf04d..c128a027473 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/StatsStream.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/StatsStream.java @@ -23,13 +23,17 @@ import java.util.List; import java.util.Locale; import java.util.Map; import java.util.Map.Entry; +import java.util.stream.Collectors; import org.apache.solr.client.solrj.impl.CloudSolrClient; import org.apache.solr.client.solrj.impl.CloudSolrClient.Builder; import org.apache.solr.client.solrj.io.SolrClientCache; import org.apache.solr.client.solrj.io.Tuple; import org.apache.solr.client.solrj.io.comp.StreamComparator; +import org.apache.solr.client.solrj.io.stream.expr.Explanation; +import org.apache.solr.client.solrj.io.stream.expr.Explanation.ExpressionType; import org.apache.solr.client.solrj.io.stream.expr.Expressible; +import org.apache.solr.client.solrj.io.stream.expr.StreamExplanation; import org.apache.solr.client.solrj.io.stream.expr.StreamExpression; import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParameter; import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParameter; @@ -149,6 +153,29 @@ public class StatsStream extends TupleStream implements Expressible { return expression; } + + @Override + public Explanation toExplanation(StreamFactory factory) throws IOException { + + StreamExplanation explanation = new StreamExplanation(getStreamNodeId().toString()); + + explanation.setFunctionName(factory.getFunctionName(this.getClass())); + explanation.setImplementingClass(this.getClass().getName()); + explanation.setExpressionType(ExpressionType.STREAM_SOURCE); + explanation.setExpression(toExpression(factory).toString()); + + StreamExplanation child = new StreamExplanation(getStreamNodeId() + "-datastore"); + child.setFunctionName(String.format(Locale.ROOT, "solr (worker ? of ?)")); + // TODO: fix this so we know the # of workers - check with Joel about a Stat's ability to be in a + // parallel stream. + + child.setImplementingClass("Solr/Lucene"); + child.setExpressionType(ExpressionType.DATASTORE); + child.setExpression(props.entrySet().stream().map(e -> String.format(Locale.ROOT, "%s=%s", e.getKey(), e.getValue())).collect(Collectors.joining(","))); + explanation.addChild(child); + + return explanation; + } public void setStreamContext(StreamContext context) { cache = context.getSolrClientCache(); diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TopicStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TopicStream.java index e3434fc9131..1a6139ee427 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TopicStream.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TopicStream.java @@ -23,33 +23,36 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashMap; -import java.util.Random; -import java.util.Set; import java.util.List; import java.util.Locale; import java.util.Map; import java.util.Map.Entry; +import java.util.Random; +import java.util.Set; import java.util.TreeSet; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; +import java.util.stream.Collectors; import org.apache.solr.client.solrj.impl.CloudSolrClient.Builder; import org.apache.solr.client.solrj.impl.HttpSolrClient; import org.apache.solr.client.solrj.io.Tuple; import org.apache.solr.client.solrj.io.comp.ComparatorOrder; import org.apache.solr.client.solrj.io.comp.FieldComparator; +import org.apache.solr.client.solrj.io.stream.expr.Explanation; +import org.apache.solr.client.solrj.io.stream.expr.Explanation.ExpressionType; import org.apache.solr.client.solrj.io.stream.expr.Expressible; +import org.apache.solr.client.solrj.io.stream.expr.StreamExplanation; import org.apache.solr.client.solrj.io.stream.expr.StreamExpression; import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParameter; -import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParameter; import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionValue; import org.apache.solr.client.solrj.io.stream.expr.StreamFactory; import org.apache.solr.client.solrj.request.UpdateRequest; import org.apache.solr.common.SolrDocument; import org.apache.solr.common.SolrInputDocument; import org.apache.solr.common.cloud.ClusterState; -import org.apache.solr.common.cloud.Slice; import org.apache.solr.common.cloud.Replica; +import org.apache.solr.common.cloud.Slice; import org.apache.solr.common.cloud.ZkCoreNodeProps; import org.apache.solr.common.cloud.ZkStateReader; import org.apache.solr.common.util.ExecutorUtil; @@ -61,7 +64,6 @@ public class TopicStream extends CloudSolrStream implements Expressible { private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); - private static final long serialVersionUID = 1; private long count; @@ -69,7 +71,7 @@ public class TopicStream extends CloudSolrStream implements Expressible { private String id; protected long checkpointEvery; - private Map checkpoints = new HashMap(); + private Map checkpoints = new HashMap(); private String checkpointCollection; public TopicStream(String zkHost, @@ -178,7 +180,7 @@ public class TopicStream extends CloudSolrStream implements Expressible { } @Override - public StreamExpressionParameter toExpression(StreamFactory factory) throws IOException { + public StreamExpression toExpression(StreamFactory factory) throws IOException { // function name StreamExpression expression = new StreamExpression(factory.getFunctionName(this.getClass())); @@ -204,6 +206,42 @@ public class TopicStream extends CloudSolrStream implements Expressible { return expression; } + + @Override + public Explanation toExplanation(StreamFactory factory) throws IOException { + + StreamExplanation explanation = new StreamExplanation(getStreamNodeId().toString()); + + explanation.setFunctionName(factory.getFunctionName(this.getClass())); + explanation.setImplementingClass(this.getClass().getName()); + explanation.setExpressionType(ExpressionType.STREAM_SOURCE); + explanation.setExpression(toExpression(factory).toString()); + + { + // child 1 is a datastore so add it at this point + StreamExplanation child = new StreamExplanation(getStreamNodeId() + "-datastore"); + child.setFunctionName(String.format(Locale.ROOT, "solr (%s)", collection)); + // TODO: fix this so we know the # of workers - check with Joel about a Topic's ability to be in a + // parallel stream. + + child.setImplementingClass("Solr/Lucene"); + child.setExpressionType(ExpressionType.DATASTORE); + child.setExpression(params.entrySet().stream().map(e -> String.format(Locale.ROOT, "%s=%s", e.getKey(), e.getValue())).collect(Collectors.joining(","))); + explanation.addChild(child); + } + + { + // child 2 is a place where we store and read checkpoint info from + StreamExplanation child = new StreamExplanation(getStreamNodeId() + "-checkpoint"); + child.setFunctionName(String.format(Locale.ROOT, "solr (checkpoint store)")); + child.setImplementingClass("Solr/Lucene"); + child.setExpressionType(ExpressionType.DATASTORE); + child.setExpression(String.format(Locale.ROOT, "id=%s, collection=%s, checkpointEvery=%d", id, checkpointCollection, checkpointEvery)); + explanation.addChild(child); + } + + return explanation; + } public List children() { List l = new ArrayList(); diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TupleStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TupleStream.java index 76afa11d3b1..3f149a07a29 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TupleStream.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TupleStream.java @@ -21,14 +21,19 @@ import java.io.IOException; import java.io.Serializable; import java.io.Writer; import java.util.List; +import java.util.UUID; import org.apache.solr.client.solrj.io.Tuple; import org.apache.solr.client.solrj.io.comp.StreamComparator; +import org.apache.solr.client.solrj.io.stream.expr.Explanation; +import org.apache.solr.client.solrj.io.stream.expr.StreamFactory; public abstract class TupleStream implements Closeable, Serializable { private static final long serialVersionUID = 1; + + private UUID streamNodeId = UUID.randomUUID(); public TupleStream() { @@ -54,8 +59,13 @@ public abstract class TupleStream implements Closeable, Serializable { public abstract StreamComparator getStreamSort(); + public abstract Explanation toExplanation(StreamFactory factory) throws IOException; + public int getCost() { return 0; } + public UUID getStreamNodeId(){ + return streamNodeId; + } } \ No newline at end of file diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/UniqueStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/UniqueStream.java index 83d38f499cf..21ef55a0377 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/UniqueStream.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/UniqueStream.java @@ -26,7 +26,10 @@ import org.apache.solr.client.solrj.io.comp.StreamComparator; import org.apache.solr.client.solrj.io.eq.FieldEqualitor; import org.apache.solr.client.solrj.io.eq.StreamEqualitor; import org.apache.solr.client.solrj.io.ops.DistinctOperation; +import org.apache.solr.client.solrj.io.stream.expr.Explanation; +import org.apache.solr.client.solrj.io.stream.expr.Explanation.ExpressionType; import org.apache.solr.client.solrj.io.stream.expr.Expressible; +import org.apache.solr.client.solrj.io.stream.expr.StreamExplanation; import org.apache.solr.client.solrj.io.stream.expr.StreamExpression; import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParameter; import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionValue; @@ -85,16 +88,25 @@ public class UniqueStream extends TupleStream implements Expressible { } @Override - public StreamExpression toExpression(StreamFactory factory) throws IOException { + public StreamExpression toExpression(StreamFactory factory) throws IOException{ + return toExpression(factory, true); + } + + private StreamExpression toExpression(StreamFactory factory, boolean includeStreams) throws IOException { // function name StreamExpression expression = new StreamExpression(factory.getFunctionName(this.getClass())); - // streams - if(originalStream instanceof Expressible){ - expression.addParameter(((Expressible)originalStream).toExpression(factory)); + if(includeStreams){ + // streams + if(originalStream instanceof Expressible){ + expression.addParameter(((Expressible)originalStream).toExpression(factory)); + } + else{ + throw new IOException("This UniqueStream contains a non-expressible TupleStream - it cannot be converted to an expression"); + } } else{ - throw new IOException("This UniqueStream contains a non-expressible TupleStream - it cannot be converted to an expression"); + expression.addParameter(""); } // over @@ -107,6 +119,21 @@ public class UniqueStream extends TupleStream implements Expressible { return expression; } + + @Override + public Explanation toExplanation(StreamFactory factory) throws IOException { + + return new StreamExplanation(getStreamNodeId().toString()) + .withChildren(new Explanation[] { + originalStream.toExplanation(factory) + // we're not including that this is wrapped with a ReducerStream stream because that's just an implementation detail + }) + .withFunctionName(factory.getFunctionName(this.getClass())) + .withImplementingClass(this.getClass().getName()) + .withExpressionType(ExpressionType.STREAM_DECORATOR) + .withExpression(toExpression(factory, false).toString()) + .withHelper(originalEqualitor.toExplanation(factory)); + } public void setStreamContext(StreamContext context) { this.originalStream.setStreamContext(context); diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/UpdateStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/UpdateStream.java index 9ddd3811514..023ff56e2f4 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/UpdateStream.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/UpdateStream.java @@ -30,10 +30,12 @@ import org.apache.solr.client.solrj.impl.CloudSolrClient.Builder; import org.apache.solr.client.solrj.io.SolrClientCache; import org.apache.solr.client.solrj.io.Tuple; import org.apache.solr.client.solrj.io.comp.StreamComparator; +import org.apache.solr.client.solrj.io.stream.expr.Explanation; +import org.apache.solr.client.solrj.io.stream.expr.Explanation.ExpressionType; import org.apache.solr.client.solrj.io.stream.expr.Expressible; +import org.apache.solr.client.solrj.io.stream.expr.StreamExplanation; import org.apache.solr.client.solrj.io.stream.expr.StreamExpression; import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParameter; -import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParameter; import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionValue; import org.apache.solr.client.solrj.io.stream.expr.StreamFactory; import org.apache.solr.common.SolrInputDocument; @@ -143,21 +145,55 @@ public class UpdateStream extends TupleStream implements Expressible { } @Override - public StreamExpressionParameter toExpression(StreamFactory factory) throws IOException { + public StreamExpression toExpression(StreamFactory factory) throws IOException{ + return toExpression(factory, true); + } + + private StreamExpression toExpression(StreamFactory factory, boolean includeStreams) throws IOException { StreamExpression expression = new StreamExpression(factory.getFunctionName(this.getClass())); expression.addParameter(collection); expression.addParameter(new StreamExpressionNamedParameter("zkHost", zkHost)); expression.addParameter(new StreamExpressionNamedParameter("batchSize", Integer.toString(updateBatchSize))); - if(tupleSource instanceof Expressible){ - expression.addParameter(((Expressible)tupleSource).toExpression(factory)); - } else { - throw new IOException("This ParallelStream contains a non-expressible TupleStream - it cannot be converted to an expression"); + if(includeStreams){ + if(tupleSource instanceof Expressible){ + expression.addParameter(((Expressible)tupleSource).toExpression(factory)); + } else { + throw new IOException("This ParallelStream contains a non-expressible TupleStream - it cannot be converted to an expression"); + } + } + else{ + expression.addParameter(""); } return expression; } + @Override + public Explanation toExplanation(StreamFactory factory) throws IOException { + + // An update stream is backward wrt the order in the explanation. This stream is the "child" + // while the collection we're updating is the parent. + + StreamExplanation explanation = new StreamExplanation(getStreamNodeId() + "-datastore"); + + explanation.setFunctionName(String.format(Locale.ROOT, "solr (%s)", collection)); + explanation.setImplementingClass("Solr/Lucene"); + explanation.setExpressionType(ExpressionType.DATASTORE); + explanation.setExpression("Update into " + collection); + + // child is a datastore so add it at this point + StreamExplanation child = new StreamExplanation(getStreamNodeId().toString()); + child.setFunctionName(String.format(Locale.ROOT, factory.getFunctionName(getClass()))); + child.setImplementingClass(getClass().getName()); + child.setExpressionType(ExpressionType.STREAM_DECORATOR); + child.setExpression(toExpression(factory, false).toString()); + + explanation.addChild(child); + + return explanation; + } + @Override public void setStreamContext(StreamContext context) { this.cache = context.getSolrClientCache(); diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/expr/Explanation.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/expr/Explanation.java new file mode 100644 index 00000000000..5f028aa34fd --- /dev/null +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/expr/Explanation.java @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.solr.client.solrj.io.stream.expr; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + + +/** + * Explanation containing details about a expression + */ +public class Explanation { + + private String expressionNodeId; + private String expressionType; + private String functionName; + private String implementingClass; + private String expression; + private String note; + private List helpers; + + public Explanation(String expressionNodeId){ + this.expressionNodeId = expressionNodeId; + } + + public String getExpressionNodeId(){ + return expressionNodeId; + } + + public String getExpressionType(){ + return expressionType; + } + public void setExpressionType(String expressionType){ + this.expressionType = expressionType; + } + public Explanation withExpressionType(String expressionType){ + setExpressionType(expressionType); + return this; + } + + public String getFunctionName(){ + return functionName; + } + public void setFunctionName(String functionName){ + this.functionName = functionName; + } + public Explanation withFunctionName(String functionName){ + setFunctionName(functionName); + return this; + } + + public String getImplementingClass(){ + return implementingClass; + } + public void setImplementingClass(String implementingClass){ + this.implementingClass = implementingClass; + } + public Explanation withImplementingClass(String implementingClass){ + setImplementingClass(implementingClass); + return this; + } + + public String getExpression(){ + return expression; + } + public void setExpression(String expression){ + this.expression = expression; + } + public Explanation withExpression(String expression){ + setExpression(expression); + return this; + } + + public String getNote(){ + return note; + } + public void setNote(String note){ + this.note = note; + } + public Explanation withNote(String note){ + setNote(note); + return this; + } + + public List getHelpers(){ + return helpers; + } + public void setHelpers(List helpers){ + this.helpers = helpers; + } + public Explanation withHelpers(List helpers){ + setHelpers(helpers); + return this; + } + public Explanation withHelpers(Explanation[] helpers){ + for(Explanation helper : helpers){ + addHelper(helper); + } + return this; + } + public Explanation withHelper(Explanation helper){ + addHelper(helper); + return this; + } + public void addHelper(Explanation helper){ + if(null == helpers){ + helpers = new ArrayList(1); + } + helpers.add(helper); + } + + public Map toMap(){ + Map map = new HashMap(); + if(null != expressionNodeId){ map.put("expressionNodeId",expressionNodeId); } + if(null != expressionType){ map.put("expressionType",expressionType); } + if(null != functionName){ map.put("functionName",functionName); } + if(null != implementingClass){ map.put("implementingClass",implementingClass); } + if(null != expression){ map.put("expression",expression); } + if(null != note){ map.put("note",note); } + + if(null != helpers && 0 != helpers.size()){ + List> helperMaps = new ArrayList>(); + for(Explanation helper : helpers){ + helperMaps.add(helper.toMap()); + } + map.put("helpers", helperMaps); + } + + return map; + } + + public static interface ExpressionType{ + public static final String GRAPH_SOURCE = "graph-source"; + public static final String STREAM_SOURCE = "stream-source"; + public static final String STREAM_DECORATOR = "stream-decorator"; + public static final String DATASTORE = "datastore"; + public static final String METRIC = "metric"; + public static final String OPERATION = "operation"; + public static final String EQUALITOR = "equalitor"; + public static final String SORTER = "sorter"; + } +} diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/expr/Expressible.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/expr/Expressible.java index b44bb558836..0ee41fa1394 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/expr/Expressible.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/expr/Expressible.java @@ -25,4 +25,13 @@ public interface Expressible { // public String getFunctionName(); // public void setFunctionName(String functionName); StreamExpressionParameter toExpression(StreamFactory factory) throws IOException; + + /** + * Returns an explanation about the stream object + * @param factory Stream factory for this, contains information about the function name + * @return Explanation about this stream object containing explanations of any child stream objects + * @throws IOException throw on any error + */ + Explanation toExplanation(StreamFactory factory) throws IOException; + } diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/expr/StreamExplanation.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/expr/StreamExplanation.java new file mode 100644 index 00000000000..fdf8749c360 --- /dev/null +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/expr/StreamExplanation.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.solr.client.solrj.io.stream.expr; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +/** + * Explanation containing details about a stream expression + */ +public class StreamExplanation extends Explanation { + + private List children; + + public StreamExplanation(String expressionNodeId){ + super(expressionNodeId); + } + + public List getChildren(){ + return children; + } + public void setChildren(List children){ + this.children = children; + } + public StreamExplanation withChildren(List children){ + setChildren(children); + return this; + } + public StreamExplanation withChildren(Explanation[] children){ + for(Explanation child : children){ + addChild(child); + } + return this; + } + public void addChild(Explanation child){ + if(null == children){ + children = new ArrayList(1); + } + children.add(child); + } + + public Map toMap(){ + Map map = super.toMap(); + + if(null != children && 0 != children.size()){ + List> childrenMaps = new ArrayList>(); + for(Explanation child : children){ + childrenMaps.add(child.toMap()); + } + map.put("children", childrenMaps); + } + + return map; + } +} diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/metrics/Metric.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/metrics/Metric.java index 07a400a50e8..4e2485483a7 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/metrics/Metric.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/metrics/Metric.java @@ -16,14 +16,20 @@ */ package org.apache.solr.client.solrj.io.stream.metrics; +import java.io.IOException; import java.io.Serializable; +import java.util.UUID; import org.apache.solr.client.solrj.io.Tuple; +import org.apache.solr.client.solrj.io.stream.expr.Explanation; +import org.apache.solr.client.solrj.io.stream.expr.Explanation.ExpressionType; import org.apache.solr.client.solrj.io.stream.expr.Expressible; +import org.apache.solr.client.solrj.io.stream.expr.StreamFactory; public abstract class Metric implements Serializable, Expressible { private static final long serialVersionUID = 1L; + private UUID metricNodeId = UUID.randomUUID(); private String functionName; private String identifier; @@ -51,6 +57,19 @@ public abstract class Metric implements Serializable, Expressible { this.identifier = sb.toString(); } + @Override + public Explanation toExplanation(StreamFactory factory) throws IOException { + return new Explanation(metricNodeId.toString()) + .withFunctionName(functionName) + .withImplementingClass(getClass().getName()) + .withExpression(toExpression(factory).toString()) + .withExpressionType(ExpressionType.METRIC); + } + + public UUID getMetricNodeId(){ + return metricNodeId; + } + public abstract double getValue(); public abstract void update(Tuple tuple); public abstract Metric newInstance(); diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/RecordCountStream.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/RecordCountStream.java index 7a9c7c6fb56..722a4939eaf 100644 --- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/RecordCountStream.java +++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/RecordCountStream.java @@ -24,10 +24,13 @@ import java.util.Locale; import org.apache.solr.client.solrj.io.Tuple; import org.apache.solr.client.solrj.io.comp.StreamComparator; +import org.apache.solr.client.solrj.io.stream.expr.Explanation; import org.apache.solr.client.solrj.io.stream.expr.Expressible; +import org.apache.solr.client.solrj.io.stream.expr.StreamExplanation; import org.apache.solr.client.solrj.io.stream.expr.StreamExpression; import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParameter; import org.apache.solr.client.solrj.io.stream.expr.StreamFactory; +import org.apache.solr.client.solrj.io.stream.expr.Explanation.ExpressionType; public class RecordCountStream extends TupleStream implements Expressible, Serializable { @@ -54,20 +57,43 @@ public class RecordCountStream extends TupleStream implements Expressible, Seria } @Override - public StreamExpressionParameter toExpression(StreamFactory factory) throws IOException { + public StreamExpression toExpression(StreamFactory factory) throws IOException{ + return toExpression(factory, true); + } + + private StreamExpression toExpression(StreamFactory factory, boolean includeStreams) throws IOException { // function name StreamExpression expression = new StreamExpression(factory.getFunctionName(this.getClass())); - // stream - if(stream instanceof Expressible){ - expression.addParameter(((Expressible)stream).toExpression(factory)); + if(includeStreams){ + // stream + if(stream instanceof Expressible){ + expression.addParameter(((Expressible)stream).toExpression(factory)); + } + else{ + throw new IOException("This RecordCountStream contains a non-expressible TupleStream - it cannot be converted to an expression"); + } } else{ - throw new IOException("This RecordCountStream contains a non-expressible TupleStream - it cannot be converted to an expression"); + expression.addParameter(""); } return expression; } + + @Override + public Explanation toExplanation(StreamFactory factory) throws IOException { + + return new StreamExplanation(getStreamNodeId().toString()) + .withChildren(new Explanation[]{ + stream.toExplanation(factory) + }) + .withFunctionName(factory.getFunctionName(this.getClass())) + .withImplementingClass(this.getClass().getName()) + .withExpressionType(ExpressionType.STREAM_DECORATOR) + .withExpression(toExpression(factory, false).toString()) + ; + } public void close() throws IOException { this.stream.close(); diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionToExplanationTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionToExplanationTest.java new file mode 100644 index 00000000000..88898d8480b --- /dev/null +++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionToExplanationTest.java @@ -0,0 +1,290 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.solr.client.solrj.io.stream; + +import junit.framework.Assert; + +import org.apache.lucene.util.LuceneTestCase; +import org.apache.solr.client.solrj.io.ops.GroupOperation; +import org.apache.solr.client.solrj.io.stream.expr.Explanation; +import org.apache.solr.client.solrj.io.stream.expr.StreamExplanation; +import org.apache.solr.client.solrj.io.stream.expr.StreamExpression; +import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParser; +import org.apache.solr.client.solrj.io.stream.expr.StreamFactory; +import org.apache.solr.client.solrj.io.stream.metrics.CountMetric; +import org.apache.solr.client.solrj.io.stream.metrics.MaxMetric; +import org.apache.solr.client.solrj.io.stream.metrics.MeanMetric; +import org.apache.solr.client.solrj.io.stream.metrics.Metric; +import org.apache.solr.client.solrj.io.stream.metrics.MinMetric; +import org.apache.solr.client.solrj.io.stream.metrics.SumMetric; +import org.junit.Test; + +/** + **/ + +public class StreamExpressionToExplanationTest extends LuceneTestCase { + + private StreamFactory factory; + + public StreamExpressionToExplanationTest() { + super(); + + factory = new StreamFactory() + .withCollectionZkHost("collection1", "testhost:1234") + .withCollectionZkHost("collection2", "testhost:1234") + .withFunctionName("search", CloudSolrStream.class) + .withFunctionName("select", SelectStream.class) + .withFunctionName("merge", MergeStream.class) + .withFunctionName("unique", UniqueStream.class) + .withFunctionName("top", RankStream.class) + .withFunctionName("reduce", ReducerStream.class) + .withFunctionName("group", GroupOperation.class) + .withFunctionName("update", UpdateStream.class) + .withFunctionName("stats", StatsStream.class) + .withFunctionName("facet", FacetStream.class) + .withFunctionName("jdbc", JDBCStream.class) + .withFunctionName("intersect", IntersectStream.class) + .withFunctionName("complement", ComplementStream.class) + .withFunctionName("count", CountMetric.class) + .withFunctionName("sum", SumMetric.class) + .withFunctionName("min", MinMetric.class) + .withFunctionName("max", MaxMetric.class) + .withFunctionName("avg", MeanMetric.class) + .withFunctionName("daemon", DaemonStream.class) + .withFunctionName("topic", TopicStream.class) + ; + } + + @Test + public void testCloudSolrStream() throws Exception { + + CloudSolrStream stream; + + // Basic test + stream = new CloudSolrStream(StreamExpressionParser.parse("search(collection1, q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_i asc\")"), factory); + Explanation explanation = stream.toExplanation(factory); + Assert.assertEquals("search", explanation.getFunctionName()); + Assert.assertEquals(CloudSolrStream.class.getName(), explanation.getImplementingClass()); + + } + + @Test + public void testSelectStream() throws Exception { + + SelectStream stream; + + // Basic test + stream = new SelectStream(StreamExpressionParser.parse("select(\"a_s as fieldA\", search(collection1, q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_i asc\"))"), factory); + Explanation explanation = stream.toExplanation(factory); + Assert.assertEquals("select", explanation.getFunctionName()); + Assert.assertEquals(SelectStream.class.getName(), explanation.getImplementingClass()); + } + + @Test + public void testDaemonStream() throws Exception { + + DaemonStream stream; + + // Basic test + stream = new DaemonStream(StreamExpressionParser.parse("daemon(search(collection1, q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_i asc\"), id=\"blah\", runInterval=\"1000\", queueSize=\"100\")"), factory); + Explanation explanation = stream.toExplanation(factory); + Assert.assertEquals("daemon", explanation.getFunctionName()); + Assert.assertEquals(DaemonStream.class.getName(), explanation.getImplementingClass()); + } + + @Test + public void testTopicStream() throws Exception { + + TopicStream stream; + + // Basic test + stream = new TopicStream(StreamExpressionParser.parse("topic(collection2, collection1, q=*:*, fl=\"id,a_s,a_i,a_f\", id=\"blah\", checkpointEvery=1000)"), factory); + Explanation explanation = stream.toExplanation(factory); + Assert.assertEquals("topic", explanation.getFunctionName()); + Assert.assertEquals(TopicStream.class.getName(), explanation.getImplementingClass()); + } + + + @Test + public void testStatsStream() throws Exception { + + StatsStream stream; + + // Basic test + stream = new StatsStream(StreamExpressionParser.parse("stats(collection1, q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_i asc\", sum(a_i), avg(a_i), count(*), min(a_i), max(a_i))"), factory); + Explanation explanation = stream.toExplanation(factory); + Assert.assertEquals("stats", explanation.getFunctionName()); + Assert.assertEquals(StatsStream.class.getName(), explanation.getImplementingClass()); + + } + + @Test + public void testUniqueStream() throws Exception { + + UniqueStream stream; + + // Basic test + stream = new UniqueStream(StreamExpressionParser.parse("unique(search(collection1, q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_i asc\"), over=\"a_f\")"), factory); + Explanation explanation = stream.toExplanation(factory); + Assert.assertEquals("unique", explanation.getFunctionName()); + Assert.assertEquals(UniqueStream.class.getName(), explanation.getImplementingClass()); + } + + @Test + public void testMergeStream() throws Exception { + + MergeStream stream; + + // Basic test + stream = new MergeStream(StreamExpressionParser.parse("merge(" + + "search(collection1, q=\"id:(0 3 4)\", fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_s asc\")," + + "search(collection1, q=\"id:(1 2)\", fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_s asc\")," + + "on=\"a_f asc, a_s asc\")"), factory); + Explanation explanation = stream.toExplanation(factory); + Assert.assertEquals("merge", explanation.getFunctionName()); + Assert.assertEquals(MergeStream.class.getName(), explanation.getImplementingClass()); + Assert.assertEquals(2, ((StreamExplanation)explanation).getChildren().size()); + } + + @Test + public void testRankStream() throws Exception { + + RankStream stream; + String expressionString; + + // Basic test + stream = new RankStream(StreamExpressionParser.parse("top(" + + "n=3," + + "search(collection1, q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc,a_i asc\")," + + "sort=\"a_f asc, a_i asc\")"), factory); + Explanation explanation = stream.toExplanation(factory); + Assert.assertEquals("top", explanation.getFunctionName()); + Assert.assertEquals(RankStream.class.getName(), explanation.getImplementingClass()); + Assert.assertEquals(1, ((StreamExplanation)explanation).getChildren().size()); + } + + @Test + public void testReducerStream() throws Exception { + + ReducerStream stream; + String expressionString; + + // Basic test + stream = new ReducerStream(StreamExpressionParser.parse("reduce(" + + "search(collection1, q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_s desc, a_f asc\")," + + "by=\"a_s\", group(sort=\"a_i desc\", n=\"5\"))"), factory); + Explanation explanation = stream.toExplanation(factory); + Assert.assertEquals("reduce", explanation.getFunctionName()); + Assert.assertEquals(ReducerStream.class.getName(), explanation.getImplementingClass()); + Assert.assertEquals(1, ((StreamExplanation)explanation).getChildren().size()); + } + + @Test + public void testUpdateStream() throws Exception { + StreamExpression expression = StreamExpressionParser.parse("update(" + + "collection2, " + + "batchSize=5, " + + "search(" + + "collection1, " + + "q=*:*, " + + "fl=\"id,a_s,a_i,a_f\", " + + "sort=\"a_f asc, a_i asc\"))"); + + UpdateStream updateStream = new UpdateStream(expression, factory); + Explanation explanation = updateStream.toExplanation(factory); + Assert.assertEquals("solr (collection2)", explanation.getFunctionName()); + Assert.assertEquals("Solr/Lucene", explanation.getImplementingClass()); + + StreamExplanation updateExplanation = (StreamExplanation)explanation; + Assert.assertEquals(1, updateExplanation.getChildren().size()); + Assert.assertEquals("update", updateExplanation.getChildren().get(0).getFunctionName()); + Assert.assertEquals(UpdateStream.class.getName(), updateExplanation.getChildren().get(0).getImplementingClass()); + + } + + @Test + public void testFacetStream() throws Exception { + + FacetStream stream; + String expressionString; + + // Basic test + stream = new FacetStream(StreamExpressionParser.parse("facet(" + + "collection1, " + + "q=\"*:*\", " + + "buckets=\"a_s\", " + + "bucketSorts=\"sum(a_i) asc\", " + + "bucketSizeLimit=100, " + + "sum(a_i), sum(a_f), " + + "min(a_i), min(a_f), " + + "max(a_i), max(a_f), " + + "avg(a_i), avg(a_f), " + + "count(*)" + + ")"), factory); + expressionString = stream.toExpression(factory).toString(); + Explanation explanation = stream.toExplanation(factory); + Assert.assertEquals("facet", explanation.getFunctionName()); + Assert.assertEquals(FacetStream.class.getName(), explanation.getImplementingClass()); + Assert.assertEquals(1, ((StreamExplanation)explanation).getChildren().size()); + } + + @Test + public void testJDBCStream() throws Exception { + + JDBCStream stream; + String expressionString; + + // Basic test + stream = new JDBCStream(StreamExpressionParser.parse("jdbc(connection=\"jdbc:hsqldb:mem:.\", sql=\"select PEOPLE.ID, PEOPLE.NAME, COUNTRIES.COUNTRY_NAME from PEOPLE inner join COUNTRIES on PEOPLE.COUNTRY_CODE = COUNTRIES.CODE order by PEOPLE.ID\", sort=\"ID asc\")"), factory); + Explanation explanation = stream.toExplanation(factory); + Assert.assertEquals("jdbc", explanation.getFunctionName()); + Assert.assertEquals(JDBCStream.class.getName(), explanation.getImplementingClass()); + Assert.assertEquals(1, ((StreamExplanation)explanation).getChildren().size()); + } + + @Test + public void testIntersectStream() throws Exception { + IntersectStream stream; + String expressionString; + + // Basic test + stream = new IntersectStream(StreamExpressionParser.parse("intersect(" + + "search(collection1, q=\"id:(0 3 4)\", fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_s asc\")," + + "search(collection1, q=\"id:(1 2)\", fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_s asc\")," + + "on=\"a_f, a_s\")"), factory); + Explanation explanation = stream.toExplanation(factory); + Assert.assertEquals("intersect", explanation.getFunctionName()); + Assert.assertEquals(IntersectStream.class.getName(), explanation.getImplementingClass()); + Assert.assertEquals(2, ((StreamExplanation)explanation).getChildren().size()); + } + + @Test + public void testComplementStream() throws Exception { + ComplementStream stream; + String expressionString; + + // Basic test + stream = new ComplementStream(StreamExpressionParser.parse("complement(" + + "search(collection1, q=\"id:(0 3 4)\", fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_s asc\")," + + "search(collection1, q=\"id:(1 2)\", fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_s asc\")," + + "on=\"a_f, a_s\")"), factory); + Explanation explanation = stream.toExplanation(factory); + Assert.assertEquals("complement", explanation.getFunctionName()); + Assert.assertEquals(ComplementStream.class.getName(), explanation.getImplementingClass()); + Assert.assertEquals(2, ((StreamExplanation)explanation).getChildren().size()); + } +}