This commit is contained in:
Karl Wright 2016-04-19 12:06:32 -04:00
commit 8ba1a3880b
53 changed files with 2237 additions and 175 deletions

View File

@ -98,6 +98,10 @@ New Features
* SOLR-8349: Allow sharing of large in memory data structures across cores (Gus Heck, noble)
* SOLR-9009: Adds ability to get an Explanation of a Streaming Expression (Dennis Gove)
* SOLR-8918: Adds Streaming to the admin page under the collections section. Includes
ability to see graphically the expression explanation (Dennis Gove)
Bug Fixes
----------------------

View File

@ -49,7 +49,10 @@ import org.apache.solr.client.solrj.io.stream.StreamContext;
import org.apache.solr.client.solrj.io.stream.TupleStream;
import org.apache.solr.client.solrj.io.stream.ExceptionStream;
import org.apache.solr.client.solrj.io.stream.UniqueStream;
import org.apache.solr.client.solrj.io.stream.expr.Explanation;
import org.apache.solr.client.solrj.io.stream.expr.StreamExplanation;
import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
import org.apache.solr.client.solrj.io.stream.expr.Explanation.ExpressionType;
import org.apache.solr.client.solrj.io.stream.metrics.*;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.ZkStateReader;
@ -1289,6 +1292,19 @@ public class SQLHandler extends RequestHandlerBase implements SolrCoreAware , Pe
public void setStreamContext(StreamContext context) {
stream.setStreamContext(context);
}
@Override
public Explanation toExplanation(StreamFactory factory) throws IOException {
return new StreamExplanation(getStreamNodeId().toString())
.withChildren(new Explanation[]{
stream.toExplanation(factory)
})
.withFunctionName("SQL LIMIT")
.withExpression("--non-expressible--")
.withImplementingClass(this.getClass().getName())
.withExpressionType(ExpressionType.STREAM_DECORATOR);
}
public Tuple read() throws IOException {
++count;
@ -1348,6 +1364,19 @@ public class SQLHandler extends RequestHandlerBase implements SolrCoreAware , Pe
children.add(stream);
return children;
}
@Override
public Explanation toExplanation(StreamFactory factory) throws IOException {
return new StreamExplanation(getStreamNodeId().toString())
.withChildren(new Explanation[]{
stream.toExplanation(factory)
})
.withFunctionName("SQL HAVING")
.withExpression("--non-expressible--")
.withImplementingClass(this.getClass().getName())
.withExpressionType(ExpressionType.STREAM_DECORATOR);
}
public void setStreamContext(StreamContext context) {
stream.setStreamContext(context);
@ -1385,6 +1414,16 @@ public class SQLHandler extends RequestHandlerBase implements SolrCoreAware , Pe
this.catalogs = new ArrayList<>();
this.catalogs.add(this.zkHost);
}
@Override
public Explanation toExplanation(StreamFactory factory) throws IOException {
return new StreamExplanation(getStreamNodeId().toString())
.withFunctionName("SQL CATALOG")
.withExpression("--non-expressible--")
.withImplementingClass(this.getClass().getName())
.withExpressionType(ExpressionType.STREAM_DECORATOR);
}
public Tuple read() throws IOException {
Map<String, String> fields = new HashMap<>();
@ -1425,6 +1464,16 @@ public class SQLHandler extends RequestHandlerBase implements SolrCoreAware , Pe
public void open() throws IOException {
}
@Override
public Explanation toExplanation(StreamFactory factory) throws IOException {
return new StreamExplanation(getStreamNodeId().toString())
.withFunctionName("SQL SCHEMA")
.withExpression("--non-expressible--")
.withImplementingClass(this.getClass().getName())
.withExpressionType(ExpressionType.STREAM_DECORATOR);
}
public Tuple read() throws IOException {
Map<String, String> fields = new HashMap<>();
@ -1470,6 +1519,16 @@ public class SQLHandler extends RequestHandlerBase implements SolrCoreAware , Pe
}
Collections.sort(this.tables);
}
@Override
public Explanation toExplanation(StreamFactory factory) throws IOException {
return new StreamExplanation(getStreamNodeId().toString())
.withFunctionName("SQL TABLE")
.withExpression("--non-expressible--")
.withImplementingClass(this.getClass().getName())
.withExpressionType(ExpressionType.STREAM_DECORATOR);
}
public Tuple read() throws IOException {
Map<String, String> fields = new HashMap<>();
@ -1517,6 +1576,19 @@ public class SQLHandler extends RequestHandlerBase implements SolrCoreAware , Pe
public void open() throws IOException {
this.stream.open();
}
@Override
public Explanation toExplanation(StreamFactory factory) throws IOException {
return new StreamExplanation(getStreamNodeId().toString())
.withChildren(new Explanation[]{
stream.toExplanation(factory)
})
.withFunctionName("SQL METADATA")
.withExpression("--non-expressible--")
.withImplementingClass(this.getClass().getName())
.withExpressionType(ExpressionType.STREAM_DECORATOR);
}
// Return a metadata tuple as the first tuple and then pass through to the underlying stream.
public Tuple read() throws IOException {

View File

@ -35,8 +35,11 @@ import org.apache.solr.client.solrj.io.ops.DistinctOperation;
import org.apache.solr.client.solrj.io.ops.GroupOperation;
import org.apache.solr.client.solrj.io.ops.ReplaceOperation;
import org.apache.solr.client.solrj.io.stream.*;
import org.apache.solr.client.solrj.io.stream.expr.Explanation;
import org.apache.solr.client.solrj.io.stream.expr.Expressible;
import org.apache.solr.client.solrj.io.stream.expr.StreamExplanation;
import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
import org.apache.solr.client.solrj.io.stream.expr.Explanation.ExpressionType;
import org.apache.solr.client.solrj.io.stream.metrics.CountMetric;
import org.apache.solr.client.solrj.io.stream.metrics.MaxMetric;
import org.apache.solr.client.solrj.io.stream.metrics.MeanMetric;
@ -191,6 +194,12 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware,
context.setSolrClientCache(clientCache);
context.put("core", this.coreName);
tupleStream.setStreamContext(context);
// if asking for explanation then go get it
if(params.getBool("explain", false)){
rsp.add("explanation", tupleStream.toExplanation(this.streamFactory));
}
if(tupleStream instanceof DaemonStream) {
DaemonStream daemonStream = (DaemonStream)tupleStream;
if(daemons.containsKey(daemonStream.getId())) {
@ -271,6 +280,16 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware,
public List<TupleStream> children() {
return null;
}
@Override
public Explanation toExplanation(StreamFactory factory) throws IOException {
return new StreamExplanation(getStreamNodeId().toString())
.withFunctionName("error")
.withImplementingClass(this.getClass().getName())
.withExpressionType(ExpressionType.STREAM_DECORATOR)
.withExpression("--non-expressible--");
}
public Tuple read() {
String msg = e.getMessage();
@ -312,6 +331,16 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware,
return null;
}
@Override
public Explanation toExplanation(StreamFactory factory) throws IOException {
return new StreamExplanation(getStreamNodeId().toString())
.withFunctionName("daemon-collection")
.withImplementingClass(this.getClass().getName())
.withExpressionType(ExpressionType.STREAM_DECORATOR)
.withExpression("--non-expressible--");
}
public Tuple read() {
if(it.hasNext()) {
return it.next().getInfo();
@ -347,6 +376,16 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware,
return null;
}
@Override
public Explanation toExplanation(StreamFactory factory) throws IOException {
return new StreamExplanation(getStreamNodeId().toString())
.withFunctionName("daemon-response")
.withImplementingClass(this.getClass().getName())
.withExpressionType(ExpressionType.STREAM_DECORATOR)
.withExpression("--non-expressible--");
}
public Tuple read() {
if (sendEOF) {
Map m = new HashMap();
@ -391,6 +430,16 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware,
return this.tupleStream.children();
}
@Override
public Explanation toExplanation(StreamFactory factory) throws IOException {
return new StreamExplanation(getStreamNodeId().toString())
.withFunctionName("timer")
.withImplementingClass(this.getClass().getName())
.withExpressionType(ExpressionType.STREAM_DECORATOR)
.withExpression("--non-expressible--");
}
public Tuple read() throws IOException {
Tuple tuple = this.tupleStream.read();
if(tuple.EOF) {

View File

@ -30,6 +30,7 @@ import org.apache.lucene.index.IndexableField;
import org.apache.lucene.util.BytesRef;
import org.apache.solr.client.solrj.io.Tuple;
import org.apache.solr.client.solrj.io.stream.TupleStream;
import org.apache.solr.client.solrj.io.stream.expr.Explanation;
import org.apache.solr.common.EnumFieldValue;
import org.apache.solr.common.SolrDocument;
import org.apache.solr.common.SolrDocumentList;
@ -165,6 +166,8 @@ public abstract class TextResponseWriter {
writeNamedList(name, (NamedList)val);
} else if (val instanceof TupleStream) {
writeTupleStream((TupleStream) val);
} else if (val instanceof Explanation){
writeExplanation((Explanation) val);
} else if (val instanceof Path) {
writeStr(name, ((Path) val).toAbsolutePath().toString(), true);
} else if (val instanceof Iterable) {
@ -316,6 +319,10 @@ public abstract class TextResponseWriter {
tupleStream.writeStreamClose(writer);
tupleStream.close();
}
public void writeExplanation(Explanation explanation) throws IOException {
writeMap(null, explanation.toMap(), false, true);
}
/** if this form of the method is called, val is the Java string form of a double */

View File

@ -16,9 +16,13 @@
*/
package org.apache.solr.client.solrj.io.comp;
import java.io.IOException;
import java.util.Map;
import java.util.UUID;
import org.apache.solr.client.solrj.io.Tuple;
import org.apache.solr.client.solrj.io.stream.expr.Explanation;
import org.apache.solr.client.solrj.io.stream.expr.Explanation.ExpressionType;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParameter;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionValue;
import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
@ -29,6 +33,7 @@ import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
public class FieldComparator implements StreamComparator {
private static final long serialVersionUID = 1;
private UUID comparatorNodeId = UUID.randomUUID();
private String leftFieldName;
private String rightFieldName;
@ -84,6 +89,14 @@ public class FieldComparator implements StreamComparator {
return new StreamExpressionValue(sb.toString());
}
@Override
public Explanation toExplanation(StreamFactory factory) throws IOException {
return new Explanation(comparatorNodeId.toString())
.withExpressionType(ExpressionType.SORTER)
.withImplementingClass(getClass().getName())
.withExpression(toExpression(factory).toString());
}
/*
* What're we doing here messing around with lambdas for the comparator logic?

View File

@ -19,12 +19,15 @@ package org.apache.solr.client.solrj.io.comp;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import org.apache.solr.client.solrj.io.Tuple;
import org.apache.solr.client.solrj.io.stream.expr.Explanation;
import org.apache.solr.client.solrj.io.stream.expr.Expressible;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParameter;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionValue;
import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
import org.apache.solr.client.solrj.io.stream.expr.Explanation.ExpressionType;
/**
@ -34,6 +37,7 @@ import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
public class MultipleFieldComparator implements StreamComparator {
private static final long serialVersionUID = 1;
private UUID comparatorNodeId = UUID.randomUUID();
private StreamComparator[] comps;
@ -91,6 +95,14 @@ public class MultipleFieldComparator implements StreamComparator {
return false;
}
@Override
public Explanation toExplanation(StreamFactory factory) throws IOException {
return new Explanation(comparatorNodeId.toString())
.withExpressionType(ExpressionType.SORTER)
.withImplementingClass(getClass().getName())
.withExpression(toExpression(factory).toString());
}
@Override
public MultipleFieldComparator copyAliased(Map<String,String> aliases){

View File

@ -16,15 +16,15 @@
*/
package org.apache.solr.client.solrj.io.eq;
import java.io.Serializable;
import java.util.Comparator;
import java.io.IOException;
import java.util.UUID;
import org.apache.solr.client.solrj.io.Tuple;
import org.apache.solr.client.solrj.io.comp.ComparatorOrder;
import org.apache.solr.client.solrj.io.comp.FieldComparator;
import org.apache.solr.client.solrj.io.comp.MultipleFieldComparator;
import org.apache.solr.client.solrj.io.comp.StreamComparator;
import org.apache.solr.client.solrj.io.stream.expr.Expressible;
import org.apache.solr.client.solrj.io.stream.expr.Explanation;
import org.apache.solr.client.solrj.io.stream.expr.Explanation.ExpressionType;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParameter;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionValue;
import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
@ -35,6 +35,7 @@ import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
public class FieldEqualitor implements StreamEqualitor {
private static final long serialVersionUID = 1;
private UUID equalitorNodeId = UUID.randomUUID();
private String leftFieldName;
private String rightFieldName;
@ -63,6 +64,14 @@ public class FieldEqualitor implements StreamEqualitor {
return new StreamExpressionValue(sb.toString());
}
@Override
public Explanation toExplanation(StreamFactory factory) throws IOException {
return new Explanation(equalitorNodeId.toString())
.withExpressionType(ExpressionType.EQUALITOR)
.withImplementingClass(getClass().getName())
.withExpression(toExpression(factory).toString());
}
public boolean test(Tuple leftTuple, Tuple rightTuple) {

View File

@ -17,14 +17,17 @@
package org.apache.solr.client.solrj.io.eq;
import java.io.IOException;
import java.util.UUID;
import org.apache.solr.client.solrj.io.Tuple;
import org.apache.solr.client.solrj.io.comp.MultipleFieldComparator;
import org.apache.solr.client.solrj.io.comp.StreamComparator;
import org.apache.solr.client.solrj.io.stream.expr.Explanation;
import org.apache.solr.client.solrj.io.stream.expr.Expressible;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParameter;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionValue;
import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
import org.apache.solr.client.solrj.io.stream.expr.Explanation.ExpressionType;
/**
@ -34,6 +37,7 @@ import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
public class MultipleFieldEqualitor implements StreamEqualitor {
private static final long serialVersionUID = 1;
private UUID equalitorNodeId = UUID.randomUUID();
private StreamEqualitor[] eqs;
@ -60,6 +64,14 @@ public class MultipleFieldEqualitor implements StreamEqualitor {
return new StreamExpressionValue(sb.toString());
}
@Override
public Explanation toExplanation(StreamFactory factory) throws IOException {
return new Explanation(equalitorNodeId.toString())
.withExpressionType(ExpressionType.EQUALITOR)
.withImplementingClass(getClass().getName())
.withExpression(toExpression(factory).toString());
}
@Override
public boolean isDerivedFrom(StreamEqualitor base){

View File

@ -29,6 +29,7 @@ import java.util.ArrayList;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
import org.apache.solr.client.solrj.io.eq.MultipleFieldEqualitor;
import org.apache.solr.client.solrj.io.stream.*;
@ -36,12 +37,15 @@ import org.apache.solr.client.solrj.io.stream.metrics.*;
import org.apache.solr.client.solrj.io.Tuple;
import org.apache.solr.client.solrj.io.comp.StreamComparator;
import org.apache.solr.client.solrj.io.eq.FieldEqualitor;
import org.apache.solr.client.solrj.io.stream.expr.Explanation;
import org.apache.solr.client.solrj.io.stream.expr.Expressible;
import org.apache.solr.client.solrj.io.stream.expr.StreamExplanation;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParameter;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParameter;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionValue;
import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
import org.apache.solr.client.solrj.io.stream.expr.Explanation.ExpressionType;
import org.apache.solr.common.util.ExecutorUtil;
import org.apache.solr.common.util.SolrjNamedThreadFactory;
@ -50,7 +54,7 @@ public class GatherNodesStream extends TupleStream implements Expressible {
private String zkHost;
private String collection;
private StreamContext streamContext;
private Map queryParams;
private Map<String,String> queryParams;
private String traverseFrom;
private String traverseTo;
private String gather;
@ -250,21 +254,30 @@ public class GatherNodesStream extends TupleStream implements Expressible {
}
@Override
public StreamExpressionParameter toExpression(StreamFactory factory) throws IOException {
public StreamExpression toExpression(StreamFactory factory) throws IOException{
return toExpression(factory, true);
}
private StreamExpression toExpression(StreamFactory factory, boolean includeStreams) throws IOException {
StreamExpression expression = new StreamExpression(factory.getFunctionName(this.getClass()));
// collection
expression.addParameter(collection);
if(tupleStream instanceof Expressible){
expression.addParameter(((Expressible)tupleStream).toExpression(factory));
if(includeStreams){
if(tupleStream instanceof Expressible){
expression.addParameter(((Expressible)tupleStream).toExpression(factory));
}
else{
throw new IOException("This GatherNodesStream contains a non-expressible TupleStream - it cannot be converted to an expression");
}
}
else{
throw new IOException("This GatherNodesStream contains a non-expressible TupleStream - it cannot be converted to an expression");
expression.addParameter("<stream>");
}
Set<Map.Entry> entries = queryParams.entrySet();
Set<Map.Entry<String,String>> entries = queryParams.entrySet();
// parameters
for(Map.Entry param : entries){
String value = param.getValue().toString();
@ -300,6 +313,37 @@ public class GatherNodesStream extends TupleStream implements Expressible {
return expression;
}
@Override
public Explanation toExplanation(StreamFactory factory) throws IOException {
StreamExplanation explanation = new StreamExplanation(getStreamNodeId().toString());
explanation.setFunctionName(factory.getFunctionName(this.getClass()));
explanation.setImplementingClass(this.getClass().getName());
explanation.setExpressionType(ExpressionType.GRAPH_SOURCE);
explanation.setExpression(toExpression(factory).toString());
// one child is a stream
explanation.addChild(tupleStream.toExplanation(factory));
// one child is a datastore so add it at this point
StreamExplanation child = new StreamExplanation(getStreamNodeId() + "-datastore");
child.setFunctionName("solr (graph)");
child.setImplementingClass("Solr/Lucene");
child.setExpressionType(ExpressionType.DATASTORE);
child.setExpression(queryParams.entrySet().stream().map(e -> String.format(Locale.ROOT, "%s=%s", e.getKey(), e.getValue())).collect(Collectors.joining(",")));
explanation.addChild(child);
if(null != metrics){
for(Metric metric : metrics){
explanation.addHelper(metric.toExplanation(factory));
}
}
return explanation;
}
public void setStreamContext(StreamContext context) {
this.traversal = (Traversal) context.get("traversal");
@ -576,5 +620,15 @@ public class GatherNodesStream extends TupleStream implements Expressible {
return new Tuple(map);
}
}
@Override
public Explanation toExplanation(StreamFactory factory) throws IOException {
return new StreamExplanation(getStreamNodeId().toString())
.withFunctionName("non-expressible")
.withImplementingClass(this.getClass().getName())
.withExpressionType(ExpressionType.STREAM_SOURCE)
.withExpression("non-expressible");
}
}
}

View File

@ -30,18 +30,22 @@ import java.util.ArrayList;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
import org.apache.solr.client.solrj.io.eq.MultipleFieldEqualitor;
import org.apache.solr.client.solrj.io.stream.*;
import org.apache.solr.client.solrj.io.Tuple;
import org.apache.solr.client.solrj.io.comp.StreamComparator;
import org.apache.solr.client.solrj.io.eq.FieldEqualitor;
import org.apache.solr.client.solrj.io.stream.expr.Explanation;
import org.apache.solr.client.solrj.io.stream.expr.Expressible;
import org.apache.solr.client.solrj.io.stream.expr.StreamExplanation;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParameter;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParameter;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionValue;
import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
import org.apache.solr.client.solrj.io.stream.expr.Explanation.ExpressionType;
import org.apache.solr.common.util.ExecutorUtil;
import org.apache.solr.common.util.SolrjNamedThreadFactory;
@ -61,7 +65,7 @@ public class ShortestPathStream extends TupleStream implements Expressible {
private boolean found;
private StreamContext streamContext;
private int threads;
private Map queryParams;
private Map<String,String> queryParams;
public ShortestPathStream(String zkHost,
String collection,
@ -221,7 +225,7 @@ public class ShortestPathStream extends TupleStream implements Expressible {
// collection
expression.addParameter(collection);
Set<Map.Entry> entries = queryParams.entrySet();
Set<Map.Entry<String,String>> entries = queryParams.entrySet();
// parameters
for(Map.Entry param : entries){
String value = param.getValue().toString();
@ -243,6 +247,27 @@ public class ShortestPathStream extends TupleStream implements Expressible {
expression.addParameter(new StreamExpressionNamedParameter("edge", fromField+"="+toField));
return expression;
}
@Override
public Explanation toExplanation(StreamFactory factory) throws IOException {
StreamExplanation explanation = new StreamExplanation(getStreamNodeId().toString());
explanation.setFunctionName(factory.getFunctionName(this.getClass()));
explanation.setImplementingClass(this.getClass().getName());
explanation.setExpressionType(ExpressionType.GRAPH_SOURCE);
explanation.setExpression(toExpression(factory).toString());
// child is a datastore so add it at this point
StreamExplanation child = new StreamExplanation(getStreamNodeId() + "-datastore");
child.setFunctionName("solr (graph)");
child.setImplementingClass("Solr/Lucene");
child.setExpressionType(ExpressionType.DATASTORE);
child.setExpression(queryParams.entrySet().stream().map(e -> String.format(Locale.ROOT, "%s=%s", e.getKey(), e.getValue())).collect(Collectors.joining(",")));
explanation.addChild(child);
return explanation;
}
public void setStreamContext(StreamContext context) {
this.streamContext = context;

View File

@ -18,8 +18,11 @@ package org.apache.solr.client.solrj.io.ops;
import java.io.IOException;
import java.util.Locale;
import java.util.UUID;
import org.apache.solr.client.solrj.io.Tuple;
import org.apache.solr.client.solrj.io.stream.expr.Explanation;
import org.apache.solr.client.solrj.io.stream.expr.Explanation.ExpressionType;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParameter;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParameter;
@ -33,7 +36,8 @@ import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
public class ConcatOperation implements StreamOperation {
private static final long serialVersionUID = 1;
private UUID operationNodeId = UUID.randomUUID();
private String[] fields;
private String as;
private String delim;
@ -94,4 +98,13 @@ public class ConcatOperation implements StreamOperation {
return expression;
}
@Override
public Explanation toExplanation(StreamFactory factory) throws IOException {
return new Explanation(operationNodeId.toString())
.withExpressionType(ExpressionType.OPERATION)
.withFunctionName(factory.getFunctionName(getClass()))
.withImplementingClass(getClass().getName())
.withExpression(toExpression(factory).toString());
}
}

View File

@ -16,32 +16,20 @@
*/
package org.apache.solr.client.solrj.io.ops;
import org.apache.solr.client.solrj.io.Tuple;
import org.apache.solr.client.solrj.io.comp.FieldComparator;
import org.apache.solr.client.solrj.io.comp.StreamComparator;
import org.apache.solr.client.solrj.io.eq.FieldEqualitor;
import org.apache.solr.client.solrj.io.eq.StreamEqualitor;
import org.apache.solr.client.solrj.io.stream.expr.Expressible;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParameter;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParameter;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionValue;
import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
import java.io.IOException;
import java.io.Serializable;
import java.util.Comparator;
import java.util.List;
import java.util.ArrayList;
import java.util.Locale;
import java.util.Map;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.PriorityQueue;
import java.util.UUID;
import org.apache.solr.client.solrj.io.Tuple;
import org.apache.solr.client.solrj.io.stream.expr.Explanation;
import org.apache.solr.client.solrj.io.stream.expr.Explanation.ExpressionType;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParameter;
import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
public class DistinctOperation implements ReduceOperation {
private static final long serialVersionUID = 1L;
private UUID operationNodeId = UUID.randomUUID();
private Tuple current;
public DistinctOperation(StreamExpression expression, StreamFactory factory) throws IOException {
@ -60,6 +48,15 @@ public class DistinctOperation implements ReduceOperation {
return expression;
}
@Override
public Explanation toExplanation(StreamFactory factory) throws IOException {
return new Explanation(operationNodeId.toString())
.withExpressionType(ExpressionType.OPERATION)
.withFunctionName(factory.getFunctionName(getClass()))
.withImplementingClass(getClass().getName())
.withExpression(toExpression(factory).toString());
}
public Tuple reduce() {
// Return the tuple after setting current to null. This will ensure the next call to
// operate stores that tuple

View File

@ -16,29 +16,34 @@
*/
package org.apache.solr.client.solrj.io.ops;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.UUID;
import org.apache.solr.client.solrj.io.Tuple;
import org.apache.solr.client.solrj.io.comp.FieldComparator;
import org.apache.solr.client.solrj.io.comp.StreamComparator;
import org.apache.solr.client.solrj.io.stream.expr.Expressible;
import org.apache.solr.client.solrj.io.stream.expr.Explanation;
import org.apache.solr.client.solrj.io.stream.expr.Explanation.ExpressionType;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParameter;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParameter;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionValue;
import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
import java.io.IOException;
import java.io.Serializable;
import java.util.Comparator;
import java.util.List;
import java.util.ArrayList;
import java.util.Locale;
import java.util.Map;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.PriorityQueue;
public class GroupOperation implements ReduceOperation {
private static final long serialVersionUID = 1L;
private UUID operationNodeId = UUID.randomUUID();
private PriorityQueue<Tuple> priorityQueue;
private Comparator comp;
private StreamComparator streamComparator;
@ -86,6 +91,18 @@ public class GroupOperation implements ReduceOperation {
return expression;
}
@Override
public Explanation toExplanation(StreamFactory factory) throws IOException {
return new Explanation(operationNodeId.toString())
.withExpressionType(ExpressionType.OPERATION)
.withFunctionName(factory.getFunctionName(getClass()))
.withImplementingClass(getClass().getName())
.withExpression(toExpression(factory).toString())
.withHelpers(new Explanation[]{
streamComparator.toExplanation(factory)
});
}
public Tuple reduce() {
Map map = new HashMap();
List<Map> list = new ArrayList();
@ -122,6 +139,7 @@ public class GroupOperation implements ReduceOperation {
}
public int compare(Tuple t1, Tuple t2) {
// Couldn't this be comp.compare(t2,t1) ?
return comp.compare(t1, t2)*(-1);
}
}

View File

@ -20,6 +20,7 @@ import java.io.IOException;
import java.util.Locale;
import org.apache.solr.client.solrj.io.Tuple;
import org.apache.solr.client.solrj.io.stream.expr.Explanation;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParameter;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParameter;
@ -80,4 +81,9 @@ public class ReplaceOperation implements StreamOperation {
return replacer.toExpression(factory);
}
@Override
public Explanation toExplanation(StreamFactory factory) throws IOException{
return replacer.toExplanation(factory);
}
}

View File

@ -18,13 +18,16 @@ package org.apache.solr.client.solrj.io.ops;
import java.io.IOException;
import java.util.Locale;
import java.util.UUID;
import org.apache.solr.client.solrj.io.Tuple;
import org.apache.solr.client.solrj.io.stream.expr.Explanation;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParameter;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParameter;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionValue;
import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
import org.apache.solr.client.solrj.io.stream.expr.Explanation.ExpressionType;
/**
* Implementation of replace(...., withField=fieldName)
@ -33,6 +36,7 @@ import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
public class ReplaceWithFieldOperation implements StreamOperation {
private static final long serialVersionUID = 1;
private UUID operationNodeId = UUID.randomUUID();
private boolean wasBuiltWithFieldName;
private String originalFieldName;
@ -108,4 +112,12 @@ public class ReplaceWithFieldOperation implements StreamOperation {
return expression;
}
@Override
public Explanation toExplanation(StreamFactory factory) throws IOException {
return new Explanation(operationNodeId.toString())
.withExpressionType(ExpressionType.OPERATION)
.withFunctionName(factory.getFunctionName(getClass()))
.withImplementingClass(getClass().getName())
.withExpression(toExpression(factory).toString());
}
}

View File

@ -18,13 +18,16 @@ package org.apache.solr.client.solrj.io.ops;
import java.io.IOException;
import java.util.Locale;
import java.util.UUID;
import org.apache.solr.client.solrj.io.Tuple;
import org.apache.solr.client.solrj.io.stream.expr.Explanation;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParameter;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParameter;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionValue;
import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
import org.apache.solr.client.solrj.io.stream.expr.Explanation.ExpressionType;
/**
* Implementation of replace(...., withValue="some value")
@ -33,6 +36,7 @@ import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
public class ReplaceWithValueOperation implements StreamOperation {
private static final long serialVersionUID = 1;
private UUID operationNodeId = UUID.randomUUID();
private boolean wasBuiltWithFieldName;
private String fieldName;
@ -113,4 +117,12 @@ public class ReplaceWithValueOperation implements StreamOperation {
return expression;
}
@Override
public Explanation toExplanation(StreamFactory factory) throws IOException {
return new Explanation(operationNodeId.toString())
.withExpressionType(ExpressionType.OPERATION)
.withFunctionName(factory.getFunctionName(getClass()))
.withImplementingClass(getClass().getName())
.withExpression(toExpression(factory).toString());
}
}

View File

@ -16,13 +16,10 @@
*/
package org.apache.solr.client.solrj.io.ops;
import java.io.IOException;
import java.io.Serializable;
import org.apache.solr.client.solrj.io.Tuple;
import org.apache.solr.client.solrj.io.stream.expr.Expressible;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParameter;
import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
/**
* Interface for any operation one can perform on a tuple in a TupleStream

View File

@ -26,12 +26,13 @@ import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.Random;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.impl.CloudSolrClient.Builder;
@ -41,10 +42,12 @@ import org.apache.solr.client.solrj.io.comp.ComparatorOrder;
import org.apache.solr.client.solrj.io.comp.FieldComparator;
import org.apache.solr.client.solrj.io.comp.MultipleFieldComparator;
import org.apache.solr.client.solrj.io.comp.StreamComparator;
import org.apache.solr.client.solrj.io.stream.expr.Explanation;
import org.apache.solr.client.solrj.io.stream.expr.Explanation.ExpressionType;
import org.apache.solr.client.solrj.io.stream.expr.Expressible;
import org.apache.solr.client.solrj.io.stream.expr.StreamExplanation;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParameter;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParameter;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionValue;
import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
import org.apache.solr.common.cloud.ClusterState;
@ -154,7 +157,7 @@ public class CloudSolrStream extends TupleStream implements Expressible {
}
@Override
public StreamExpressionParameter toExpression(StreamFactory factory) throws IOException {
public StreamExpression toExpression(StreamFactory factory) throws IOException {
// functionName(collectionName, param1, param2, ..., paramN, sort="comp", [aliases="field=alias,..."])
// function name
@ -194,6 +197,29 @@ public class CloudSolrStream extends TupleStream implements Expressible {
return expression;
}
@Override
public Explanation toExplanation(StreamFactory factory) throws IOException {
StreamExplanation explanation = new StreamExplanation(getStreamNodeId().toString());
explanation.setFunctionName(factory.getFunctionName(this.getClass()));
explanation.setImplementingClass(this.getClass().getName());
explanation.setExpressionType(ExpressionType.STREAM_SOURCE);
explanation.setExpression(toExpression(factory).toString());
// child is a datastore so add it at this point
StreamExplanation child = new StreamExplanation(getStreamNodeId() + "-datastore");
child.setFunctionName(String.format(Locale.ROOT, "solr (%s)", collection));
child.setImplementingClass("Solr/Lucene");
child.setExpressionType(ExpressionType.DATASTORE);
if(null != params){
child.setExpression(params.entrySet().stream().map(e -> String.format(Locale.ROOT, "%s=%s", e.getKey(), e.getValue())).collect(Collectors.joining(",")));
}
explanation.addChild(child);
return explanation;
}
private void init(String collectionName, String zkHost, Map params) throws IOException {
this.zkHost = zkHost;
this.collection = collectionName;

View File

@ -22,12 +22,13 @@ import java.util.List;
import java.util.Locale;
import org.apache.solr.client.solrj.io.Tuple;
import org.apache.solr.client.solrj.io.comp.FieldComparator;
import org.apache.solr.client.solrj.io.comp.StreamComparator;
import org.apache.solr.client.solrj.io.eq.FieldEqualitor;
import org.apache.solr.client.solrj.io.eq.StreamEqualitor;
import org.apache.solr.client.solrj.io.ops.DistinctOperation;
import org.apache.solr.client.solrj.io.stream.expr.Explanation;
import org.apache.solr.client.solrj.io.stream.expr.Explanation.ExpressionType;
import org.apache.solr.client.solrj.io.stream.expr.Expressible;
import org.apache.solr.client.solrj.io.stream.expr.StreamExplanation;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParameter;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionValue;
@ -88,23 +89,33 @@ public class ComplementStream extends TupleStream implements Expressible {
}
@Override
public StreamExpression toExpression(StreamFactory factory) throws IOException {
public StreamExpression toExpression(StreamFactory factory) throws IOException{
return toExpression(factory, true);
}
private StreamExpression toExpression(StreamFactory factory, boolean includeStreams) throws IOException {
// function name
StreamExpression expression = new StreamExpression(factory.getFunctionName(this.getClass()));
// streams
if(streamA instanceof Expressible){
expression.addParameter(((Expressible)streamA).toExpression(factory));
if(includeStreams){
// streams
if(streamA instanceof Expressible){
expression.addParameter(((Expressible)streamA).toExpression(factory));
}
else{
throw new IOException("This IntersectionStream contains a non-expressible TupleStream - it cannot be converted to an expression");
}
if(originalStreamB instanceof Expressible){
expression.addParameter(((Expressible)originalStreamB).toExpression(factory));
}
else{
throw new IOException("This IntersectStream contains a non-expressible TupleStream - it cannot be converted to an expression");
}
}
else{
throw new IOException("This IntersectionStream contains a non-expressible TupleStream - it cannot be converted to an expression");
}
if(originalStreamB instanceof Expressible){
expression.addParameter(((Expressible)originalStreamB).toExpression(factory));
}
else{
throw new IOException("This IntersectStream contains a non-expressible TupleStream - it cannot be converted to an expression");
expression.addParameter("<stream>");
expression.addParameter("<stream>");
}
// on
@ -112,6 +123,22 @@ public class ComplementStream extends TupleStream implements Expressible {
return expression;
}
@Override
public Explanation toExplanation(StreamFactory factory) throws IOException {
return new StreamExplanation(getStreamNodeId().toString())
.withChildren(new Explanation[]{
streamA.toExplanation(factory),
originalStreamB.toExplanation(factory)
})
.withFunctionName(factory.getFunctionName(this.getClass()))
.withImplementingClass(this.getClass().getName())
.withExpressionType(ExpressionType.STREAM_DECORATOR)
.withExpression(toExpression(factory, false).toString())
.withHelper(eq.toExplanation(factory))
;
}
public void setStreamContext(StreamContext context) {
this.streamA.setStreamContext(context);
@ -119,7 +146,7 @@ public class ComplementStream extends TupleStream implements Expressible {
}
public List<TupleStream> children() {
List<TupleStream> l = new ArrayList();
List<TupleStream> l = new ArrayList<TupleStream>();
l.add(streamA);
l.add(streamB);
return l;

View File

@ -15,23 +15,26 @@
* limitations under the License.
*/
package org.apache.solr.client.solrj.io.stream;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import org.apache.solr.client.solrj.io.Tuple;
import org.apache.solr.client.solrj.io.comp.StreamComparator;
import org.apache.solr.client.solrj.io.stream.expr.Explanation;
import org.apache.solr.client.solrj.io.stream.expr.Explanation.ExpressionType;
import org.apache.solr.client.solrj.io.stream.expr.Expressible;
import org.apache.solr.client.solrj.io.stream.expr.StreamExplanation;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParameter;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionValue;
import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.lang.invoke.MethodHandles;
import java.util.Locale;
import java.util.Map;
import java.util.HashMap;
import java.util.Date;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -100,15 +103,24 @@ public class DaemonStream extends TupleStream implements Expressible {
}
@Override
public StreamExpression toExpression(StreamFactory factory) throws IOException {
public StreamExpression toExpression(StreamFactory factory) throws IOException{
return toExpression(factory, true);
}
private StreamExpression toExpression(StreamFactory factory, boolean includeStreams) throws IOException {
// function name
StreamExpression expression = new StreamExpression(factory.getFunctionName(this.getClass()));
// streams
if(tupleStream instanceof Expressible){
expression.addParameter(((Expressible)tupleStream).toExpression(factory));
} else {
throw new IOException("This UniqueStream contains a non-expressible TupleStream - it cannot be converted to an expression");
if(includeStreams){
// streams
if(tupleStream instanceof Expressible){
expression.addParameter(((Expressible)tupleStream).toExpression(factory));
} else {
throw new IOException("This UniqueStream contains a non-expressible TupleStream - it cannot be converted to an expression");
}
}
else{
expression.addParameter("<stream>");
}
expression.addParameter(new StreamExpressionNamedParameter("id", id));
@ -117,6 +129,19 @@ public class DaemonStream extends TupleStream implements Expressible {
return expression;
}
@Override
public Explanation toExplanation(StreamFactory factory) throws IOException {
return new StreamExplanation(getStreamNodeId().toString())
.withChildren(new Explanation[] {
tupleStream.toExplanation(factory)
})
.withFunctionName(factory.getFunctionName(this.getClass()))
.withImplementingClass(this.getClass().getName())
.withExpressionType(ExpressionType.STREAM_DECORATOR)
.withExpression(toExpression(factory, false).toString());
}
public int remainingCapacity() {
return this.queue.remainingCapacity();

View File

@ -24,6 +24,10 @@ import java.util.Map;
import org.apache.solr.client.solrj.io.Tuple;
import org.apache.solr.client.solrj.io.comp.StreamComparator;
import org.apache.solr.client.solrj.io.stream.expr.Explanation;
import org.apache.solr.client.solrj.io.stream.expr.Explanation.ExpressionType;
import org.apache.solr.client.solrj.io.stream.expr.StreamExplanation;
import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
import org.apache.solr.common.SolrException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -70,6 +74,16 @@ public class ExceptionStream extends TupleStream {
return new Tuple(fields);
}
}
@Override
public Explanation toExplanation(StreamFactory factory) throws IOException {
return new StreamExplanation(getStreamNodeId().toString())
.withFunctionName("non-expressible")
.withImplementingClass(this.getClass().getName())
.withExpressionType(ExpressionType.STREAM_SOURCE)
.withExpression("non-expressible");
}
public StreamComparator getStreamSort() {
return this.stream.getStreamSort();

View File

@ -17,13 +17,14 @@
package org.apache.solr.client.solrj.io.stream;
import java.io.IOException;
import java.util.HashMap;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Collections;
import java.util.Map.Entry;
import java.util.stream.Collectors;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.impl.CloudSolrClient.Builder;
@ -33,7 +34,10 @@ import org.apache.solr.client.solrj.io.comp.ComparatorOrder;
import org.apache.solr.client.solrj.io.comp.FieldComparator;
import org.apache.solr.client.solrj.io.comp.MultipleFieldComparator;
import org.apache.solr.client.solrj.io.comp.StreamComparator;
import org.apache.solr.client.solrj.io.stream.expr.Explanation;
import org.apache.solr.client.solrj.io.stream.expr.Explanation.ExpressionType;
import org.apache.solr.client.solrj.io.stream.expr.Expressible;
import org.apache.solr.client.solrj.io.stream.expr.StreamExplanation;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParameter;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParameter;
@ -58,7 +62,7 @@ public class FacetStream extends TupleStream implements Expressible {
private Metric[] metrics;
private int bucketSizeLimit;
private FieldComparator[] bucketSorts;
private List<Tuple> tuples = new ArrayList();
private List<Tuple> tuples = new ArrayList<Tuple>();
private int index;
private String zkHost;
private Map<String, String> props;
@ -267,7 +271,31 @@ public class FacetStream extends TupleStream implements Expressible {
return expression;
}
@Override
public Explanation toExplanation(StreamFactory factory) throws IOException {
StreamExplanation explanation = new StreamExplanation(getStreamNodeId().toString());
explanation.setFunctionName(factory.getFunctionName(this.getClass()));
explanation.setImplementingClass(this.getClass().getName());
explanation.setExpressionType(ExpressionType.STREAM_SOURCE);
explanation.setExpression(toExpression(factory).toString());
// child is a datastore so add it at this point
StreamExplanation child = new StreamExplanation(getStreamNodeId() + "-datastore");
child.setFunctionName(String.format(Locale.ROOT, "solr (%s)", collection));
// TODO: fix this so we know the # of workers - check with Joel about a Topic's ability to be in a
// parallel stream.
child.setImplementingClass("Solr/Lucene");
child.setExpressionType(ExpressionType.DATASTORE);
child.setExpression(props.entrySet().stream().map(e -> String.format(Locale.ROOT, "%s=%s", e.getKey(), e.getValue())).collect(Collectors.joining(",")));
explanation.addChild(child);
return explanation;
}
public void setStreamContext(StreamContext context) {
cache = context.getSolrClientCache();
}

View File

@ -24,7 +24,10 @@ import java.util.Locale;
import org.apache.solr.client.solrj.io.Tuple;
import org.apache.solr.client.solrj.io.comp.StreamComparator;
import org.apache.solr.client.solrj.io.stream.expr.Explanation;
import org.apache.solr.client.solrj.io.stream.expr.Explanation.ExpressionType;
import org.apache.solr.client.solrj.io.stream.expr.Expressible;
import org.apache.solr.client.solrj.io.stream.expr.StreamExplanation;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParameter;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionValue;
@ -99,17 +102,27 @@ public class HashJoinStream extends TupleStream implements Expressible {
}
@Override
public StreamExpression toExpression(StreamFactory factory) throws IOException {
public StreamExpression toExpression(StreamFactory factory) throws IOException{
return toExpression(factory, true);
}
private StreamExpression toExpression(StreamFactory factory, boolean includeStreams) throws IOException {
// function name
StreamExpression expression = new StreamExpression(factory.getFunctionName(this.getClass()));
// streams
if(hashStream instanceof Expressible && fullStream instanceof Expressible){
expression.addParameter(((Expressible)fullStream).toExpression(factory));
expression.addParameter(new StreamExpressionNamedParameter("hashed", ((Expressible)hashStream).toExpression(factory)));
if(includeStreams){
// streams
if(hashStream instanceof Expressible && fullStream instanceof Expressible){
expression.addParameter(((Expressible)fullStream).toExpression(factory));
expression.addParameter(new StreamExpressionNamedParameter("hashed", ((Expressible)hashStream).toExpression(factory)));
}
else{
throw new IOException("This HashJoinStream contains a non-expressible TupleStream - it cannot be converted to an expression");
}
}
else{
throw new IOException("This HashJoinStream contains a non-expressible TupleStream - it cannot be converted to an expression");
expression.addParameter("<stream>");
expression.addParameter("hashed=<stream>");
}
// on
@ -122,6 +135,20 @@ public class HashJoinStream extends TupleStream implements Expressible {
return expression;
}
@Override
public Explanation toExplanation(StreamFactory factory) throws IOException {
return new StreamExplanation(getStreamNodeId().toString())
.withChildren(new Explanation[]{
fullStream.toExplanation(factory),
hashStream.toExplanation(factory)
})
.withFunctionName(factory.getFunctionName(this.getClass()))
.withImplementingClass(this.getClass().getName())
.withExpressionType(ExpressionType.STREAM_DECORATOR)
.withExpression(toExpression(factory, false).toString());
}
public void setStreamContext(StreamContext context) {
this.hashStream.setStreamContext(context);
@ -129,7 +156,7 @@ public class HashJoinStream extends TupleStream implements Expressible {
}
public List<TupleStream> children() {
List<TupleStream> l = new ArrayList();
List<TupleStream> l = new ArrayList<TupleStream>();
l.add(hashStream);
l.add(fullStream);
return l;

View File

@ -22,12 +22,13 @@ import java.util.List;
import java.util.Locale;
import org.apache.solr.client.solrj.io.Tuple;
import org.apache.solr.client.solrj.io.comp.FieldComparator;
import org.apache.solr.client.solrj.io.comp.StreamComparator;
import org.apache.solr.client.solrj.io.eq.FieldEqualitor;
import org.apache.solr.client.solrj.io.eq.StreamEqualitor;
import org.apache.solr.client.solrj.io.ops.DistinctOperation;
import org.apache.solr.client.solrj.io.stream.expr.Explanation;
import org.apache.solr.client.solrj.io.stream.expr.Explanation.ExpressionType;
import org.apache.solr.client.solrj.io.stream.expr.Expressible;
import org.apache.solr.client.solrj.io.stream.expr.StreamExplanation;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParameter;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionValue;
@ -88,23 +89,33 @@ public class IntersectStream extends TupleStream implements Expressible {
}
@Override
public StreamExpression toExpression(StreamFactory factory) throws IOException {
public StreamExpression toExpression(StreamFactory factory) throws IOException{
return toExpression(factory, true);
}
private StreamExpression toExpression(StreamFactory factory, boolean includeStreams) throws IOException {
// function name
StreamExpression expression = new StreamExpression(factory.getFunctionName(this.getClass()));
// streams
if(streamA instanceof Expressible){
expression.addParameter(((Expressible)streamA).toExpression(factory));
if(includeStreams){
// streams
if(streamA instanceof Expressible){
expression.addParameter(((Expressible)streamA).toExpression(factory));
}
else{
throw new IOException("This IntersectStream contains a non-expressible TupleStream - it cannot be converted to an expression");
}
if(originalStreamB instanceof Expressible){
expression.addParameter(((Expressible)originalStreamB).toExpression(factory));
}
else{
throw new IOException("This IntersectStream contains a non-expressible TupleStream - it cannot be converted to an expression");
}
}
else{
throw new IOException("This IntersectStream contains a non-expressible TupleStream - it cannot be converted to an expression");
}
if(originalStreamB instanceof Expressible){
expression.addParameter(((Expressible)originalStreamB).toExpression(factory));
}
else{
throw new IOException("This IntersectStream contains a non-expressible TupleStream - it cannot be converted to an expression");
expression.addParameter("<stream>");
expression.addParameter("<stream>");
}
// on
@ -113,13 +124,28 @@ public class IntersectStream extends TupleStream implements Expressible {
return expression;
}
@Override
public Explanation toExplanation(StreamFactory factory) throws IOException {
return new StreamExplanation(getStreamNodeId().toString())
.withChildren(new Explanation[]{
streamA.toExplanation(factory),
originalStreamB.toExplanation(factory)
})
.withFunctionName(factory.getFunctionName(this.getClass()))
.withImplementingClass(this.getClass().getName())
.withExpressionType(ExpressionType.STREAM_DECORATOR)
.withExpression(toExpression(factory, false).toString())
.withHelper(eq.toExplanation(factory));
}
public void setStreamContext(StreamContext context) {
this.streamA.setStreamContext(context);
this.streamB.setStreamContext(context);
}
public List<TupleStream> children() {
List<TupleStream> l = new ArrayList();
List<TupleStream> l = new ArrayList<TupleStream>();
l.add(streamA);
l.add(streamB);
return l;

View File

@ -34,7 +34,10 @@ import java.util.Properties;
import org.apache.solr.client.solrj.io.Tuple;
import org.apache.solr.client.solrj.io.comp.FieldComparator;
import org.apache.solr.client.solrj.io.comp.StreamComparator;
import org.apache.solr.client.solrj.io.stream.expr.Explanation;
import org.apache.solr.client.solrj.io.stream.expr.Explanation.ExpressionType;
import org.apache.solr.client.solrj.io.stream.expr.Expressible;
import org.apache.solr.client.solrj.io.stream.expr.StreamExplanation;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParameter;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParameter;
@ -348,6 +351,40 @@ public class JDBCStream extends TupleStream implements Expressible {
return expression;
}
@Override
public Explanation toExplanation(StreamFactory factory) throws IOException {
StreamExplanation explanation = new StreamExplanation(getStreamNodeId().toString());
explanation.setFunctionName(factory.getFunctionName(this.getClass()));
explanation.setImplementingClass(this.getClass().getName());
explanation.setExpressionType(ExpressionType.STREAM_SOURCE);
StreamExpression expression = (StreamExpression)toExpression(factory);
explanation.setExpression(expression.toString());
String driverClassName = this.driverClassName;
if(null == driverClassName){
try{
driverClassName = DriverManager.getDriver(connectionUrl).getClass().getName();
}
catch(Exception e){
driverClassName = String.format(Locale.ROOT, "Failed to find driver for connectionUrl='%s'", connectionUrl);
}
}
// child is a datastore so add it at this point
StreamExplanation child = new StreamExplanation(getStreamNodeId() + "-datastore");
child.setFunctionName("jdbc-source");
child.setImplementingClass(driverClassName);
child.setExpressionType(ExpressionType.DATASTORE);
child.setExpression(sqlQuery);
explanation.addChild(child);
return explanation;
}
@Override
public List<TupleStream> children() {
return new ArrayList<TupleStream>();

View File

@ -26,7 +26,10 @@ import org.apache.solr.client.solrj.io.Tuple;
import org.apache.solr.client.solrj.io.comp.StreamComparator;
import org.apache.solr.client.solrj.io.eq.FieldEqualitor;
import org.apache.solr.client.solrj.io.eq.StreamEqualitor;
import org.apache.solr.client.solrj.io.stream.expr.Explanation;
import org.apache.solr.client.solrj.io.stream.expr.Explanation.ExpressionType;
import org.apache.solr.client.solrj.io.stream.expr.Expressible;
import org.apache.solr.client.solrj.io.stream.expr.StreamExplanation;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParameter;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionValue;
@ -87,13 +90,22 @@ public abstract class JoinStream extends TupleStream implements Expressible {
}
@Override
public StreamExpression toExpression(StreamFactory factory) throws IOException {
public StreamExpression toExpression(StreamFactory factory) throws IOException{
return toExpression(factory, true);
}
private StreamExpression toExpression(StreamFactory factory, boolean includeStreams) throws IOException {
// function name
StreamExpression expression = new StreamExpression(factory.getFunctionName(this.getClass()));
// streams
for (PushBackStream stream : streams) {
expression.addParameter(stream.toExpression(factory));
if(includeStreams){
expression.addParameter(stream.toExpression(factory));
}
else{
expression.addParameter("<stream>");
}
}
// on
@ -107,6 +119,23 @@ public abstract class JoinStream extends TupleStream implements Expressible {
return expression;
}
@Override
public Explanation toExplanation(StreamFactory factory) throws IOException {
StreamExplanation explanation = new StreamExplanation(getStreamNodeId().toString());
explanation.setFunctionName(factory.getFunctionName(this.getClass()));
explanation.setImplementingClass(this.getClass().getName());
explanation.setExpressionType(ExpressionType.STREAM_DECORATOR);
explanation.setExpression(toExpression(factory, false).toString());
explanation.addHelper(eq.toExplanation(factory));
for(TupleStream stream : streams){
explanation.addChild(stream.toExplanation(factory));
}
return explanation;
}
public void setStreamContext(StreamContext context) {
for (PushBackStream stream : streams) {
stream.setStreamContext(context);

View File

@ -24,7 +24,10 @@ import java.util.Locale;
import org.apache.solr.client.solrj.io.Tuple;
import org.apache.solr.client.solrj.io.comp.FieldComparator;
import org.apache.solr.client.solrj.io.comp.StreamComparator;
import org.apache.solr.client.solrj.io.stream.expr.Explanation;
import org.apache.solr.client.solrj.io.stream.expr.Explanation.ExpressionType;
import org.apache.solr.client.solrj.io.stream.expr.Expressible;
import org.apache.solr.client.solrj.io.stream.expr.StreamExplanation;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParameter;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionValue;
@ -95,13 +98,22 @@ public class MergeStream extends TupleStream implements Expressible {
}
@Override
public StreamExpression toExpression(StreamFactory factory) throws IOException {
public StreamExpression toExpression(StreamFactory factory) throws IOException{
return toExpression(factory, true);
}
private StreamExpression toExpression(StreamFactory factory, boolean includeStreams) throws IOException {
// function name
StreamExpression expression = new StreamExpression(factory.getFunctionName(this.getClass()));
// streams
for(PushBackStream stream : streams){
expression.addParameter(stream.toExpression(factory));
if(includeStreams){
expression.addParameter(stream.toExpression(factory));
}
else{
expression.addParameter("<stream>");
}
}
// on
@ -109,6 +121,23 @@ public class MergeStream extends TupleStream implements Expressible {
return expression;
}
@Override
public Explanation toExplanation(StreamFactory factory) throws IOException {
StreamExplanation explanation = new StreamExplanation(getStreamNodeId().toString());
explanation.setFunctionName(factory.getFunctionName(this.getClass()));
explanation.setImplementingClass(this.getClass().getName());
explanation.setExpressionType(ExpressionType.STREAM_DECORATOR);
explanation.setExpression(toExpression(factory, false).toString());
explanation.addHelper(comp.toExplanation(factory));
for(PushBackStream stream : streams){
explanation.addChild(stream.toExplanation(factory));
}
return explanation;
}
public void setStreamContext(StreamContext context) {
for(PushBackStream stream : streams){
@ -117,7 +146,7 @@ public class MergeStream extends TupleStream implements Expressible {
}
public List<TupleStream> children() {
List<TupleStream> l = new ArrayList();
List<TupleStream> l = new ArrayList<TupleStream>();
for(PushBackStream stream : streams){
l.add(stream);
}

View File

@ -30,16 +30,20 @@ import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Map.Entry;
import java.util.stream.Collectors;
import java.util.Random;
import org.apache.solr.client.solrj.io.Tuple;
import org.apache.solr.client.solrj.io.comp.FieldComparator;
import org.apache.solr.client.solrj.io.comp.StreamComparator;
import org.apache.solr.client.solrj.io.stream.expr.Explanation;
import org.apache.solr.client.solrj.io.stream.expr.Expressible;
import org.apache.solr.client.solrj.io.stream.expr.StreamExplanation;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParameter;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionValue;
import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
import org.apache.solr.client.solrj.io.stream.expr.Explanation.ExpressionType;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
@ -163,8 +167,11 @@ public class ParallelStream extends CloudSolrStream implements Expressible {
}
@Override
public StreamExpression toExpression(StreamFactory factory) throws IOException {
public StreamExpression toExpression(StreamFactory factory) throws IOException{
return toExpression(factory, true);
}
private StreamExpression toExpression(StreamFactory factory, boolean includeStreams) throws IOException {
// function name
StreamExpression expression = new StreamExpression(factory.getFunctionName(this.getClass()));
@ -174,12 +181,16 @@ public class ParallelStream extends CloudSolrStream implements Expressible {
// workers
expression.addParameter(new StreamExpressionNamedParameter("workers", Integer.toString(workers)));
// stream
if(tupleStream instanceof Expressible){
expression.addParameter(((Expressible)tupleStream).toExpression(factory));
if(includeStreams){
if(tupleStream instanceof Expressible){
expression.addParameter(((Expressible)tupleStream).toExpression(factory));
}
else{
throw new IOException("This ParallelStream contains a non-expressible TupleStream - it cannot be converted to an expression");
}
}
else{
throw new IOException("This ParallelStream contains a non-expressible TupleStream - it cannot be converted to an expression");
expression.addParameter("<stream>");
}
// sort
@ -191,6 +202,24 @@ public class ParallelStream extends CloudSolrStream implements Expressible {
return expression;
}
@Override
public Explanation toExplanation(StreamFactory factory) throws IOException {
StreamExplanation explanation = new StreamExplanation(getStreamNodeId().toString());
explanation.setFunctionName(factory.getFunctionName(this.getClass()));
explanation.setImplementingClass(this.getClass().getName());
explanation.setExpressionType(ExpressionType.STREAM_DECORATOR);
explanation.setExpression(toExpression(factory, false).toString());
// add a child for each worker
for(int idx = 0; idx < workers; ++idx){
explanation.addChild(tupleStream.toExplanation(factory));
}
return explanation;
}
public List<TupleStream> children() {
List l = new ArrayList();
l.add(tupleStream);

View File

@ -22,6 +22,7 @@ import java.util.List;
import org.apache.solr.client.solrj.io.Tuple;
import org.apache.solr.client.solrj.io.comp.StreamComparator;
import org.apache.solr.client.solrj.io.stream.expr.Explanation;
import org.apache.solr.client.solrj.io.stream.expr.Expressible;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParameter;
import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
@ -51,12 +52,16 @@ public class PushBackStream extends TupleStream implements Expressible {
throw new IOException("This PushBackStream contains a non-expressible TupleStream - it cannot be converted to an expression");
}
public Explanation toExplanation(StreamFactory factory) throws IOException{
return stream.toExplanation(factory);
}
public void setStreamContext(StreamContext context) {
this.stream.setStreamContext(context);
}
public List<TupleStream> children() {
List<TupleStream> l = new ArrayList();
List<TupleStream> l = new ArrayList<TupleStream>();
l.add(stream);
return l;
}

View File

@ -24,6 +24,7 @@ import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Map.Entry;
import java.util.stream.Collectors;
import java.util.Random;
import java.util.Iterator;
@ -31,12 +32,15 @@ import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.io.SolrClientCache;
import org.apache.solr.client.solrj.io.Tuple;
import org.apache.solr.client.solrj.io.comp.StreamComparator;
import org.apache.solr.client.solrj.io.stream.expr.Explanation;
import org.apache.solr.client.solrj.io.stream.expr.Expressible;
import org.apache.solr.client.solrj.io.stream.expr.StreamExplanation;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParameter;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParameter;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionValue;
import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
import org.apache.solr.client.solrj.io.stream.expr.Explanation.ExpressionType;
import org.apache.solr.client.solrj.request.QueryRequest;
import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.common.SolrDocument;
@ -132,6 +136,28 @@ public class RandomStream extends TupleStream implements Expressible {
return expression;
}
@Override
public Explanation toExplanation(StreamFactory factory) throws IOException {
StreamExplanation explanation = new StreamExplanation(getStreamNodeId().toString());
explanation.setFunctionName(factory.getFunctionName(this.getClass()));
explanation.setImplementingClass(this.getClass().getName());
explanation.setExpressionType(ExpressionType.STREAM_SOURCE);
explanation.setExpression(toExpression(factory).toString());
// child is a datastore so add it at this point
StreamExplanation child = new StreamExplanation(getStreamNodeId() + "-datastore");
child.setFunctionName(String.format(Locale.ROOT, "solr (%s)", collection));
child.setImplementingClass("Solr/Lucene");
child.setExpressionType(ExpressionType.DATASTORE);
if(null != props){
child.setExpression(props.entrySet().stream().map(e -> String.format(Locale.ROOT, "%s=%s", e.getKey(), e.getValue())).collect(Collectors.joining(",")));
}
explanation.addChild(child);
return explanation;
}
public void setStreamContext(StreamContext context) {
cache = context.getSolrClientCache();

View File

@ -28,7 +28,10 @@ import java.util.PriorityQueue;
import org.apache.solr.client.solrj.io.Tuple;
import org.apache.solr.client.solrj.io.comp.FieldComparator;
import org.apache.solr.client.solrj.io.comp.StreamComparator;
import org.apache.solr.client.solrj.io.stream.expr.Explanation;
import org.apache.solr.client.solrj.io.stream.expr.Explanation.ExpressionType;
import org.apache.solr.client.solrj.io.stream.expr.Expressible;
import org.apache.solr.client.solrj.io.stream.expr.StreamExplanation;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParameter;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionValue;
@ -101,19 +104,28 @@ public class RankStream extends TupleStream implements Expressible {
}
@Override
public StreamExpression toExpression(StreamFactory factory) throws IOException {
public StreamExpression toExpression(StreamFactory factory) throws IOException{
return toExpression(factory, true);
}
private StreamExpression toExpression(StreamFactory factory, boolean includeStreams) throws IOException {
// function name
StreamExpression expression = new StreamExpression(factory.getFunctionName(this.getClass()));
// n
expression.addParameter(new StreamExpressionNamedParameter("n", Integer.toString(size)));
// stream
if(stream instanceof Expressible){
expression.addParameter(((Expressible)stream).toExpression(factory));
if(includeStreams){
// stream
if(stream instanceof Expressible){
expression.addParameter(((Expressible)stream).toExpression(factory));
}
else{
throw new IOException("This RankStream contains a non-expressible TupleStream - it cannot be converted to an expression");
}
}
else{
throw new IOException("This RankStream contains a non-expressible TupleStream - it cannot be converted to an expression");
expression.addParameter("<stream>");
}
// sort
@ -122,19 +134,33 @@ public class RankStream extends TupleStream implements Expressible {
return expression;
}
@Override
public Explanation toExplanation(StreamFactory factory) throws IOException {
return new StreamExplanation(getStreamNodeId().toString())
.withChildren(new Explanation[]{
stream.toExplanation(factory)
})
.withFunctionName(factory.getFunctionName(this.getClass()))
.withImplementingClass(this.getClass().getName())
.withExpressionType(ExpressionType.STREAM_DECORATOR)
.withExpression(toExpression(factory, false).toString())
.withHelper(comp.toExplanation(factory));
}
public void setStreamContext(StreamContext context) {
this.stream.setStreamContext(context);
}
public List<TupleStream> children() {
List<TupleStream> l = new ArrayList();
List<TupleStream> l = new ArrayList<TupleStream>();
l.add(stream);
return l;
}
public void open() throws IOException {
this.top = new PriorityQueue(size, new ReverseComp(comp));
this.topList = new LinkedList();
this.top = new PriorityQueue<Tuple>(size, new ReverseComp(comp));
this.topList = new LinkedList<Tuple>();
stream.open();
}
@ -187,6 +213,7 @@ public class RankStream extends TupleStream implements Expressible {
class ReverseComp implements Comparator<Tuple>, Serializable {
private static final long serialVersionUID = 1L;
private StreamComparator comp;
public ReverseComp(StreamComparator comp) {

View File

@ -18,10 +18,8 @@ package org.apache.solr.client.solrj.io.stream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import org.apache.solr.client.solrj.io.Tuple;
import org.apache.solr.client.solrj.io.comp.FieldComparator;
@ -32,7 +30,10 @@ import org.apache.solr.client.solrj.io.eq.MultipleFieldEqualitor;
import org.apache.solr.client.solrj.io.eq.StreamEqualitor;
import org.apache.solr.client.solrj.io.ops.ReduceOperation;
import org.apache.solr.client.solrj.io.ops.StreamOperation;
import org.apache.solr.client.solrj.io.stream.expr.Explanation;
import org.apache.solr.client.solrj.io.stream.expr.Explanation.ExpressionType;
import org.apache.solr.client.solrj.io.stream.expr.Expressible;
import org.apache.solr.client.solrj.io.stream.expr.StreamExplanation;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParameter;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionValue;
@ -134,12 +135,21 @@ public class ReducerStream extends TupleStream implements Expressible {
}
@Override
public StreamExpression toExpression(StreamFactory factory) throws IOException {
public StreamExpression toExpression(StreamFactory factory) throws IOException{
return toExpression(factory, true);
}
private StreamExpression toExpression(StreamFactory factory, boolean includeStreams) throws IOException {
// function name
StreamExpression expression = new StreamExpression(factory.getFunctionName(this.getClass()));
// stream
expression.addParameter(stream.toExpression(factory));
if(includeStreams){
expression.addParameter(stream.toExpression(factory));
}
else{
expression.addParameter("<stream>");
}
// over
if(eq instanceof Expressible){
@ -158,12 +168,29 @@ public class ReducerStream extends TupleStream implements Expressible {
return expression;
}
@Override
public Explanation toExplanation(StreamFactory factory) throws IOException {
return new StreamExplanation(getStreamNodeId().toString())
.withChildren(new Explanation[]{
stream.toExplanation(factory)
})
.withFunctionName(factory.getFunctionName(this.getClass()))
.withImplementingClass(this.getClass().getName())
.withExpressionType(ExpressionType.STREAM_DECORATOR)
.withExpression(toExpression(factory, false).toString())
.withHelpers(new Explanation[]{
eq.toExplanation(factory),
op.toExplanation(factory)
});
}
public void setStreamContext(StreamContext context) {
this.stream.setStreamContext(context);
}
public List<TupleStream> children() {
List<TupleStream> l = new ArrayList();
List<TupleStream> l = new ArrayList<TupleStream>();
l.add(stream);
return l;
}

View File

@ -17,23 +17,24 @@
package org.apache.solr.client.solrj.io.stream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.ArrayList;
import org.apache.solr.client.solrj.io.Tuple;
import org.apache.solr.client.solrj.io.comp.FieldComparator;
import org.apache.solr.client.solrj.io.comp.HashKey;
import org.apache.solr.client.solrj.io.comp.StreamComparator;
import org.apache.solr.client.solrj.io.eq.FieldEqualitor;
import org.apache.solr.client.solrj.io.eq.MultipleFieldEqualitor;
import org.apache.solr.client.solrj.io.eq.StreamEqualitor;
import org.apache.solr.client.solrj.io.stream.expr.Explanation;
import org.apache.solr.client.solrj.io.stream.expr.Explanation.ExpressionType;
import org.apache.solr.client.solrj.io.stream.expr.Expressible;
import org.apache.solr.client.solrj.io.stream.expr.StreamExplanation;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParameter;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParameter;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionValue;
import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
import org.apache.solr.client.solrj.io.stream.metrics.Bucket;
@ -121,12 +122,21 @@ public class RollupStream extends TupleStream implements Expressible {
}
@Override
public StreamExpressionParameter toExpression(StreamFactory factory) throws IOException {
public StreamExpression toExpression(StreamFactory factory) throws IOException{
return toExpression(factory, true);
}
private StreamExpression toExpression(StreamFactory factory, boolean includeStreams) throws IOException {
// function name
StreamExpression expression = new StreamExpression(factory.getFunctionName(this.getClass()));
// stream
expression.addParameter(tupleStream.toExpression(factory));
if(includeStreams){
expression.addParameter(tupleStream.toExpression(factory));
}
else{
expression.addParameter("<stream>");
}
// over
StringBuilder overBuilder = new StringBuilder();
@ -143,6 +153,25 @@ public class RollupStream extends TupleStream implements Expressible {
return expression;
}
@Override
public Explanation toExplanation(StreamFactory factory) throws IOException {
Explanation explanation = new StreamExplanation(getStreamNodeId().toString())
.withChildren(new Explanation[]{
tupleStream.toExplanation(factory)
})
.withFunctionName(factory.getFunctionName(this.getClass()))
.withImplementingClass(this.getClass().getName())
.withExpressionType(ExpressionType.STREAM_DECORATOR)
.withExpression(toExpression(factory, false).toString());
for(Metric metric : metrics){
explanation.withHelper(metric.toExplanation(factory));
}
return explanation;
}
public void setStreamContext(StreamContext context) {
this.tupleStream.setStreamContext(context);
@ -176,7 +205,7 @@ public class RollupStream extends TupleStream implements Expressible {
return tuple;
}
Map map = new HashMap();
Map<String,Object> map = new HashMap<String,Object>();
for(Metric metric : currentMetrics) {
map.put(metric.getIdentifier(), metric.getValue());
}
@ -207,7 +236,7 @@ public class RollupStream extends TupleStream implements Expressible {
} else {
Tuple t = null;
if(currentMetrics != null) {
Map map = new HashMap();
Map<String,Object> map = new HashMap<String,Object>();
for(Metric metric : currentMetrics) {
map.put(metric.getIdentifier(), metric.getValue());
}

View File

@ -26,7 +26,10 @@ import java.util.Map;
import org.apache.solr.client.solrj.io.Tuple;
import org.apache.solr.client.solrj.io.comp.StreamComparator;
import org.apache.solr.client.solrj.io.ops.StreamOperation;
import org.apache.solr.client.solrj.io.stream.expr.Explanation;
import org.apache.solr.client.solrj.io.stream.expr.Explanation.ExpressionType;
import org.apache.solr.client.solrj.io.stream.expr.Expressible;
import org.apache.solr.client.solrj.io.stream.expr.StreamExplanation;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParameter;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionValue;
@ -110,16 +113,25 @@ public class SelectStream extends TupleStream implements Expressible {
}
@Override
public StreamExpression toExpression(StreamFactory factory) throws IOException {
public StreamExpression toExpression(StreamFactory factory) throws IOException{
return toExpression(factory, true);
}
private StreamExpression toExpression(StreamFactory factory, boolean includeStreams) throws IOException {
// function name
StreamExpression expression = new StreamExpression(factory.getFunctionName(this.getClass()));
// stream
if(stream instanceof Expressible){
expression.addParameter(((Expressible)stream).toExpression(factory));
if(includeStreams){
// stream
if(stream instanceof Expressible){
expression.addParameter(((Expressible)stream).toExpression(factory));
}
else{
throw new IOException("This SelectStream contains a non-expressible TupleStream - it cannot be converted to an expression");
}
}
else{
throw new IOException("This SelectStream contains a non-expressible TupleStream - it cannot be converted to an expression");
expression.addParameter("<stream>");
}
// selects
@ -138,6 +150,25 @@ public class SelectStream extends TupleStream implements Expressible {
return expression;
}
@Override
public Explanation toExplanation(StreamFactory factory) throws IOException {
Explanation explanation = new StreamExplanation(getStreamNodeId().toString())
.withChildren(new Explanation[]{
stream.toExplanation(factory)
})
.withFunctionName(factory.getFunctionName(this.getClass()))
.withImplementingClass(this.getClass().getName())
.withExpressionType(ExpressionType.STREAM_DECORATOR)
.withExpression(toExpression(factory, false).toString());
for(StreamOperation operation : operations){
explanation.addHelper(operation.toExplanation(factory));
}
return explanation;
}
public void setStreamContext(StreamContext context) {
this.stream.setStreamContext(context);

View File

@ -19,15 +19,19 @@ package org.apache.solr.client.solrj.io.stream;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.Map;
import java.util.HashMap;
import java.util.List;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.client.solrj.io.SolrClientCache;
import org.apache.solr.client.solrj.io.Tuple;
import org.apache.solr.client.solrj.io.comp.StreamComparator;
import org.apache.solr.client.solrj.io.stream.expr.Explanation;
import org.apache.solr.client.solrj.io.stream.expr.Explanation.ExpressionType;
import org.apache.solr.client.solrj.io.stream.expr.StreamExplanation;
import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.SolrParams;
import org.slf4j.Logger;
@ -149,6 +153,16 @@ public class SolrStream extends TupleStream {
return buf.toString();
}
@Override
public Explanation toExplanation(StreamFactory factory) throws IOException {
return new StreamExplanation(getStreamNodeId().toString())
.withFunctionName("non-expressible")
.withImplementingClass(this.getClass().getName())
.withExpressionType(ExpressionType.STREAM_SOURCE)
.withExpression("non-expressible");
}
/**
* Closes the Stream to a single Solr Instance
* */

View File

@ -25,11 +25,14 @@ import java.util.Locale;
import org.apache.solr.client.solrj.io.Tuple;
import org.apache.solr.client.solrj.io.comp.FieldComparator;
import org.apache.solr.client.solrj.io.comp.StreamComparator;
import org.apache.solr.client.solrj.io.stream.expr.Explanation;
import org.apache.solr.client.solrj.io.stream.expr.Expressible;
import org.apache.solr.client.solrj.io.stream.expr.StreamExplanation;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParameter;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionValue;
import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
import org.apache.solr.client.solrj.io.stream.expr.Explanation.ExpressionType;
/**
@ -106,16 +109,25 @@ public class SortStream extends TupleStream implements Expressible {
}
@Override
public StreamExpression toExpression(StreamFactory factory) throws IOException {
public StreamExpression toExpression(StreamFactory factory) throws IOException{
return toExpression(factory, true);
}
private StreamExpression toExpression(StreamFactory factory, boolean includeStreams) throws IOException {
// function name
StreamExpression expression = new StreamExpression(factory.getFunctionName(this.getClass()));
// streams
if(stream instanceof Expressible){
expression.addParameter(((Expressible)stream).toExpression(factory));
if(includeStreams){
// streams
if(stream instanceof Expressible){
expression.addParameter(((Expressible)stream).toExpression(factory));
}
else{
throw new IOException("This SortStream contains a non-expressible TupleStream - it cannot be converted to an expression");
}
}
else{
throw new IOException("This SortStream contains a non-expressible TupleStream - it cannot be converted to an expression");
expression.addParameter("<stream>");
}
// by
@ -128,6 +140,20 @@ public class SortStream extends TupleStream implements Expressible {
return expression;
}
@Override
public Explanation toExplanation(StreamFactory factory) throws IOException {
return new StreamExplanation(getStreamNodeId().toString())
.withChildren(new Explanation[]{
stream.toExplanation(factory)
})
.withFunctionName(factory.getFunctionName(this.getClass()))
.withImplementingClass(this.getClass().getName())
.withExpressionType(ExpressionType.STREAM_DECORATOR)
.withExpression(toExpression(factory, false).toString())
.withHelper(comparator.toExplanation(factory));
}
public void setStreamContext(StreamContext context) {
this.stream.setStreamContext(context);

View File

@ -23,13 +23,17 @@ import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Map.Entry;
import java.util.stream.Collectors;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.impl.CloudSolrClient.Builder;
import org.apache.solr.client.solrj.io.SolrClientCache;
import org.apache.solr.client.solrj.io.Tuple;
import org.apache.solr.client.solrj.io.comp.StreamComparator;
import org.apache.solr.client.solrj.io.stream.expr.Explanation;
import org.apache.solr.client.solrj.io.stream.expr.Explanation.ExpressionType;
import org.apache.solr.client.solrj.io.stream.expr.Expressible;
import org.apache.solr.client.solrj.io.stream.expr.StreamExplanation;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParameter;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParameter;
@ -149,6 +153,29 @@ public class StatsStream extends TupleStream implements Expressible {
return expression;
}
@Override
public Explanation toExplanation(StreamFactory factory) throws IOException {
StreamExplanation explanation = new StreamExplanation(getStreamNodeId().toString());
explanation.setFunctionName(factory.getFunctionName(this.getClass()));
explanation.setImplementingClass(this.getClass().getName());
explanation.setExpressionType(ExpressionType.STREAM_SOURCE);
explanation.setExpression(toExpression(factory).toString());
StreamExplanation child = new StreamExplanation(getStreamNodeId() + "-datastore");
child.setFunctionName(String.format(Locale.ROOT, "solr (worker ? of ?)"));
// TODO: fix this so we know the # of workers - check with Joel about a Stat's ability to be in a
// parallel stream.
child.setImplementingClass("Solr/Lucene");
child.setExpressionType(ExpressionType.DATASTORE);
child.setExpression(props.entrySet().stream().map(e -> String.format(Locale.ROOT, "%s=%s", e.getKey(), e.getValue())).collect(Collectors.joining(",")));
explanation.addChild(child);
return explanation;
}
public void setStreamContext(StreamContext context) {
cache = context.getSolrClientCache();

View File

@ -23,33 +23,36 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Random;
import java.util.Set;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Random;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
import org.apache.solr.client.solrj.impl.CloudSolrClient.Builder;
import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.client.solrj.io.Tuple;
import org.apache.solr.client.solrj.io.comp.ComparatorOrder;
import org.apache.solr.client.solrj.io.comp.FieldComparator;
import org.apache.solr.client.solrj.io.stream.expr.Explanation;
import org.apache.solr.client.solrj.io.stream.expr.Explanation.ExpressionType;
import org.apache.solr.client.solrj.io.stream.expr.Expressible;
import org.apache.solr.client.solrj.io.stream.expr.StreamExplanation;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParameter;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParameter;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionValue;
import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.common.SolrDocument;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.ZkCoreNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.util.ExecutorUtil;
@ -61,7 +64,6 @@ public class TopicStream extends CloudSolrStream implements Expressible {
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private static final long serialVersionUID = 1;
private long count;
@ -69,7 +71,7 @@ public class TopicStream extends CloudSolrStream implements Expressible {
private String id;
protected long checkpointEvery;
private Map<String, Long> checkpoints = new HashMap();
private Map<String, Long> checkpoints = new HashMap<String, Long>();
private String checkpointCollection;
public TopicStream(String zkHost,
@ -178,7 +180,7 @@ public class TopicStream extends CloudSolrStream implements Expressible {
}
@Override
public StreamExpressionParameter toExpression(StreamFactory factory) throws IOException {
public StreamExpression toExpression(StreamFactory factory) throws IOException {
// function name
StreamExpression expression = new StreamExpression(factory.getFunctionName(this.getClass()));
@ -204,6 +206,42 @@ public class TopicStream extends CloudSolrStream implements Expressible {
return expression;
}
@Override
public Explanation toExplanation(StreamFactory factory) throws IOException {
StreamExplanation explanation = new StreamExplanation(getStreamNodeId().toString());
explanation.setFunctionName(factory.getFunctionName(this.getClass()));
explanation.setImplementingClass(this.getClass().getName());
explanation.setExpressionType(ExpressionType.STREAM_SOURCE);
explanation.setExpression(toExpression(factory).toString());
{
// child 1 is a datastore so add it at this point
StreamExplanation child = new StreamExplanation(getStreamNodeId() + "-datastore");
child.setFunctionName(String.format(Locale.ROOT, "solr (%s)", collection));
// TODO: fix this so we know the # of workers - check with Joel about a Topic's ability to be in a
// parallel stream.
child.setImplementingClass("Solr/Lucene");
child.setExpressionType(ExpressionType.DATASTORE);
child.setExpression(params.entrySet().stream().map(e -> String.format(Locale.ROOT, "%s=%s", e.getKey(), e.getValue())).collect(Collectors.joining(",")));
explanation.addChild(child);
}
{
// child 2 is a place where we store and read checkpoint info from
StreamExplanation child = new StreamExplanation(getStreamNodeId() + "-checkpoint");
child.setFunctionName(String.format(Locale.ROOT, "solr (checkpoint store)"));
child.setImplementingClass("Solr/Lucene");
child.setExpressionType(ExpressionType.DATASTORE);
child.setExpression(String.format(Locale.ROOT, "id=%s, collection=%s, checkpointEvery=%d", id, checkpointCollection, checkpointEvery));
explanation.addChild(child);
}
return explanation;
}
public List<TupleStream> children() {
List<TupleStream> l = new ArrayList();

View File

@ -21,14 +21,19 @@ import java.io.IOException;
import java.io.Serializable;
import java.io.Writer;
import java.util.List;
import java.util.UUID;
import org.apache.solr.client.solrj.io.Tuple;
import org.apache.solr.client.solrj.io.comp.StreamComparator;
import org.apache.solr.client.solrj.io.stream.expr.Explanation;
import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
public abstract class TupleStream implements Closeable, Serializable {
private static final long serialVersionUID = 1;
private UUID streamNodeId = UUID.randomUUID();
public TupleStream() {
@ -54,8 +59,13 @@ public abstract class TupleStream implements Closeable, Serializable {
public abstract StreamComparator getStreamSort();
public abstract Explanation toExplanation(StreamFactory factory) throws IOException;
public int getCost() {
return 0;
}
public UUID getStreamNodeId(){
return streamNodeId;
}
}

View File

@ -26,7 +26,10 @@ import org.apache.solr.client.solrj.io.comp.StreamComparator;
import org.apache.solr.client.solrj.io.eq.FieldEqualitor;
import org.apache.solr.client.solrj.io.eq.StreamEqualitor;
import org.apache.solr.client.solrj.io.ops.DistinctOperation;
import org.apache.solr.client.solrj.io.stream.expr.Explanation;
import org.apache.solr.client.solrj.io.stream.expr.Explanation.ExpressionType;
import org.apache.solr.client.solrj.io.stream.expr.Expressible;
import org.apache.solr.client.solrj.io.stream.expr.StreamExplanation;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParameter;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionValue;
@ -85,16 +88,25 @@ public class UniqueStream extends TupleStream implements Expressible {
}
@Override
public StreamExpression toExpression(StreamFactory factory) throws IOException {
public StreamExpression toExpression(StreamFactory factory) throws IOException{
return toExpression(factory, true);
}
private StreamExpression toExpression(StreamFactory factory, boolean includeStreams) throws IOException {
// function name
StreamExpression expression = new StreamExpression(factory.getFunctionName(this.getClass()));
// streams
if(originalStream instanceof Expressible){
expression.addParameter(((Expressible)originalStream).toExpression(factory));
if(includeStreams){
// streams
if(originalStream instanceof Expressible){
expression.addParameter(((Expressible)originalStream).toExpression(factory));
}
else{
throw new IOException("This UniqueStream contains a non-expressible TupleStream - it cannot be converted to an expression");
}
}
else{
throw new IOException("This UniqueStream contains a non-expressible TupleStream - it cannot be converted to an expression");
expression.addParameter("<stream>");
}
// over
@ -107,6 +119,21 @@ public class UniqueStream extends TupleStream implements Expressible {
return expression;
}
@Override
public Explanation toExplanation(StreamFactory factory) throws IOException {
return new StreamExplanation(getStreamNodeId().toString())
.withChildren(new Explanation[] {
originalStream.toExplanation(factory)
// we're not including that this is wrapped with a ReducerStream stream because that's just an implementation detail
})
.withFunctionName(factory.getFunctionName(this.getClass()))
.withImplementingClass(this.getClass().getName())
.withExpressionType(ExpressionType.STREAM_DECORATOR)
.withExpression(toExpression(factory, false).toString())
.withHelper(originalEqualitor.toExplanation(factory));
}
public void setStreamContext(StreamContext context) {
this.originalStream.setStreamContext(context);

View File

@ -30,10 +30,12 @@ import org.apache.solr.client.solrj.impl.CloudSolrClient.Builder;
import org.apache.solr.client.solrj.io.SolrClientCache;
import org.apache.solr.client.solrj.io.Tuple;
import org.apache.solr.client.solrj.io.comp.StreamComparator;
import org.apache.solr.client.solrj.io.stream.expr.Explanation;
import org.apache.solr.client.solrj.io.stream.expr.Explanation.ExpressionType;
import org.apache.solr.client.solrj.io.stream.expr.Expressible;
import org.apache.solr.client.solrj.io.stream.expr.StreamExplanation;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParameter;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParameter;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionValue;
import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
import org.apache.solr.common.SolrInputDocument;
@ -143,21 +145,55 @@ public class UpdateStream extends TupleStream implements Expressible {
}
@Override
public StreamExpressionParameter toExpression(StreamFactory factory) throws IOException {
public StreamExpression toExpression(StreamFactory factory) throws IOException{
return toExpression(factory, true);
}
private StreamExpression toExpression(StreamFactory factory, boolean includeStreams) throws IOException {
StreamExpression expression = new StreamExpression(factory.getFunctionName(this.getClass()));
expression.addParameter(collection);
expression.addParameter(new StreamExpressionNamedParameter("zkHost", zkHost));
expression.addParameter(new StreamExpressionNamedParameter("batchSize", Integer.toString(updateBatchSize)));
if(tupleSource instanceof Expressible){
expression.addParameter(((Expressible)tupleSource).toExpression(factory));
} else {
throw new IOException("This ParallelStream contains a non-expressible TupleStream - it cannot be converted to an expression");
if(includeStreams){
if(tupleSource instanceof Expressible){
expression.addParameter(((Expressible)tupleSource).toExpression(factory));
} else {
throw new IOException("This ParallelStream contains a non-expressible TupleStream - it cannot be converted to an expression");
}
}
else{
expression.addParameter("<stream>");
}
return expression;
}
@Override
public Explanation toExplanation(StreamFactory factory) throws IOException {
// An update stream is backward wrt the order in the explanation. This stream is the "child"
// while the collection we're updating is the parent.
StreamExplanation explanation = new StreamExplanation(getStreamNodeId() + "-datastore");
explanation.setFunctionName(String.format(Locale.ROOT, "solr (%s)", collection));
explanation.setImplementingClass("Solr/Lucene");
explanation.setExpressionType(ExpressionType.DATASTORE);
explanation.setExpression("Update into " + collection);
// child is a datastore so add it at this point
StreamExplanation child = new StreamExplanation(getStreamNodeId().toString());
child.setFunctionName(String.format(Locale.ROOT, factory.getFunctionName(getClass())));
child.setImplementingClass(getClass().getName());
child.setExpressionType(ExpressionType.STREAM_DECORATOR);
child.setExpression(toExpression(factory, false).toString());
explanation.addChild(child);
return explanation;
}
@Override
public void setStreamContext(StreamContext context) {
this.cache = context.getSolrClientCache();

View File

@ -0,0 +1,158 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.client.solrj.io.stream.expr;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* Explanation containing details about a expression
*/
public class Explanation {
private String expressionNodeId;
private String expressionType;
private String functionName;
private String implementingClass;
private String expression;
private String note;
private List<Explanation> helpers;
public Explanation(String expressionNodeId){
this.expressionNodeId = expressionNodeId;
}
public String getExpressionNodeId(){
return expressionNodeId;
}
public String getExpressionType(){
return expressionType;
}
public void setExpressionType(String expressionType){
this.expressionType = expressionType;
}
public Explanation withExpressionType(String expressionType){
setExpressionType(expressionType);
return this;
}
public String getFunctionName(){
return functionName;
}
public void setFunctionName(String functionName){
this.functionName = functionName;
}
public Explanation withFunctionName(String functionName){
setFunctionName(functionName);
return this;
}
public String getImplementingClass(){
return implementingClass;
}
public void setImplementingClass(String implementingClass){
this.implementingClass = implementingClass;
}
public Explanation withImplementingClass(String implementingClass){
setImplementingClass(implementingClass);
return this;
}
public String getExpression(){
return expression;
}
public void setExpression(String expression){
this.expression = expression;
}
public Explanation withExpression(String expression){
setExpression(expression);
return this;
}
public String getNote(){
return note;
}
public void setNote(String note){
this.note = note;
}
public Explanation withNote(String note){
setNote(note);
return this;
}
public List<Explanation> getHelpers(){
return helpers;
}
public void setHelpers(List<Explanation> helpers){
this.helpers = helpers;
}
public Explanation withHelpers(List<Explanation> helpers){
setHelpers(helpers);
return this;
}
public Explanation withHelpers(Explanation[] helpers){
for(Explanation helper : helpers){
addHelper(helper);
}
return this;
}
public Explanation withHelper(Explanation helper){
addHelper(helper);
return this;
}
public void addHelper(Explanation helper){
if(null == helpers){
helpers = new ArrayList<Explanation>(1);
}
helpers.add(helper);
}
public Map<String,Object> toMap(){
Map<String,Object> map = new HashMap<String,Object>();
if(null != expressionNodeId){ map.put("expressionNodeId",expressionNodeId); }
if(null != expressionType){ map.put("expressionType",expressionType); }
if(null != functionName){ map.put("functionName",functionName); }
if(null != implementingClass){ map.put("implementingClass",implementingClass); }
if(null != expression){ map.put("expression",expression); }
if(null != note){ map.put("note",note); }
if(null != helpers && 0 != helpers.size()){
List<Map<String,Object>> helperMaps = new ArrayList<Map<String,Object>>();
for(Explanation helper : helpers){
helperMaps.add(helper.toMap());
}
map.put("helpers", helperMaps);
}
return map;
}
public static interface ExpressionType{
public static final String GRAPH_SOURCE = "graph-source";
public static final String STREAM_SOURCE = "stream-source";
public static final String STREAM_DECORATOR = "stream-decorator";
public static final String DATASTORE = "datastore";
public static final String METRIC = "metric";
public static final String OPERATION = "operation";
public static final String EQUALITOR = "equalitor";
public static final String SORTER = "sorter";
}
}

View File

@ -25,4 +25,13 @@ public interface Expressible {
// public String getFunctionName();
// public void setFunctionName(String functionName);
StreamExpressionParameter toExpression(StreamFactory factory) throws IOException;
/**
* Returns an explanation about the stream object
* @param factory Stream factory for this, contains information about the function name
* @return Explanation about this stream object containing explanations of any child stream objects
* @throws IOException throw on any error
*/
Explanation toExplanation(StreamFactory factory) throws IOException;
}

View File

@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.client.solrj.io.stream.expr;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
/**
* Explanation containing details about a stream expression
*/
public class StreamExplanation extends Explanation {
private List<Explanation> children;
public StreamExplanation(String expressionNodeId){
super(expressionNodeId);
}
public List<Explanation> getChildren(){
return children;
}
public void setChildren(List<Explanation> children){
this.children = children;
}
public StreamExplanation withChildren(List<Explanation> children){
setChildren(children);
return this;
}
public StreamExplanation withChildren(Explanation[] children){
for(Explanation child : children){
addChild(child);
}
return this;
}
public void addChild(Explanation child){
if(null == children){
children = new ArrayList<Explanation>(1);
}
children.add(child);
}
public Map<String,Object> toMap(){
Map<String,Object> map = super.toMap();
if(null != children && 0 != children.size()){
List<Map<String,Object>> childrenMaps = new ArrayList<Map<String,Object>>();
for(Explanation child : children){
childrenMaps.add(child.toMap());
}
map.put("children", childrenMaps);
}
return map;
}
}

View File

@ -16,14 +16,20 @@
*/
package org.apache.solr.client.solrj.io.stream.metrics;
import java.io.IOException;
import java.io.Serializable;
import java.util.UUID;
import org.apache.solr.client.solrj.io.Tuple;
import org.apache.solr.client.solrj.io.stream.expr.Explanation;
import org.apache.solr.client.solrj.io.stream.expr.Explanation.ExpressionType;
import org.apache.solr.client.solrj.io.stream.expr.Expressible;
import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
public abstract class Metric implements Serializable, Expressible {
private static final long serialVersionUID = 1L;
private UUID metricNodeId = UUID.randomUUID();
private String functionName;
private String identifier;
@ -51,6 +57,19 @@ public abstract class Metric implements Serializable, Expressible {
this.identifier = sb.toString();
}
@Override
public Explanation toExplanation(StreamFactory factory) throws IOException {
return new Explanation(metricNodeId.toString())
.withFunctionName(functionName)
.withImplementingClass(getClass().getName())
.withExpression(toExpression(factory).toString())
.withExpressionType(ExpressionType.METRIC);
}
public UUID getMetricNodeId(){
return metricNodeId;
}
public abstract double getValue();
public abstract void update(Tuple tuple);
public abstract Metric newInstance();

View File

@ -24,10 +24,13 @@ import java.util.Locale;
import org.apache.solr.client.solrj.io.Tuple;
import org.apache.solr.client.solrj.io.comp.StreamComparator;
import org.apache.solr.client.solrj.io.stream.expr.Explanation;
import org.apache.solr.client.solrj.io.stream.expr.Expressible;
import org.apache.solr.client.solrj.io.stream.expr.StreamExplanation;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParameter;
import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
import org.apache.solr.client.solrj.io.stream.expr.Explanation.ExpressionType;
public class RecordCountStream extends TupleStream implements Expressible, Serializable {
@ -54,20 +57,43 @@ public class RecordCountStream extends TupleStream implements Expressible, Seria
}
@Override
public StreamExpressionParameter toExpression(StreamFactory factory) throws IOException {
public StreamExpression toExpression(StreamFactory factory) throws IOException{
return toExpression(factory, true);
}
private StreamExpression toExpression(StreamFactory factory, boolean includeStreams) throws IOException {
// function name
StreamExpression expression = new StreamExpression(factory.getFunctionName(this.getClass()));
// stream
if(stream instanceof Expressible){
expression.addParameter(((Expressible)stream).toExpression(factory));
if(includeStreams){
// stream
if(stream instanceof Expressible){
expression.addParameter(((Expressible)stream).toExpression(factory));
}
else{
throw new IOException("This RecordCountStream contains a non-expressible TupleStream - it cannot be converted to an expression");
}
}
else{
throw new IOException("This RecordCountStream contains a non-expressible TupleStream - it cannot be converted to an expression");
expression.addParameter("<stream>");
}
return expression;
}
@Override
public Explanation toExplanation(StreamFactory factory) throws IOException {
return new StreamExplanation(getStreamNodeId().toString())
.withChildren(new Explanation[]{
stream.toExplanation(factory)
})
.withFunctionName(factory.getFunctionName(this.getClass()))
.withImplementingClass(this.getClass().getName())
.withExpressionType(ExpressionType.STREAM_DECORATOR)
.withExpression(toExpression(factory, false).toString())
;
}
public void close() throws IOException {
this.stream.close();

View File

@ -0,0 +1,290 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.client.solrj.io.stream;
import junit.framework.Assert;
import org.apache.lucene.util.LuceneTestCase;
import org.apache.solr.client.solrj.io.ops.GroupOperation;
import org.apache.solr.client.solrj.io.stream.expr.Explanation;
import org.apache.solr.client.solrj.io.stream.expr.StreamExplanation;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParser;
import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
import org.apache.solr.client.solrj.io.stream.metrics.CountMetric;
import org.apache.solr.client.solrj.io.stream.metrics.MaxMetric;
import org.apache.solr.client.solrj.io.stream.metrics.MeanMetric;
import org.apache.solr.client.solrj.io.stream.metrics.Metric;
import org.apache.solr.client.solrj.io.stream.metrics.MinMetric;
import org.apache.solr.client.solrj.io.stream.metrics.SumMetric;
import org.junit.Test;
/**
**/
public class StreamExpressionToExplanationTest extends LuceneTestCase {
private StreamFactory factory;
public StreamExpressionToExplanationTest() {
super();
factory = new StreamFactory()
.withCollectionZkHost("collection1", "testhost:1234")
.withCollectionZkHost("collection2", "testhost:1234")
.withFunctionName("search", CloudSolrStream.class)
.withFunctionName("select", SelectStream.class)
.withFunctionName("merge", MergeStream.class)
.withFunctionName("unique", UniqueStream.class)
.withFunctionName("top", RankStream.class)
.withFunctionName("reduce", ReducerStream.class)
.withFunctionName("group", GroupOperation.class)
.withFunctionName("update", UpdateStream.class)
.withFunctionName("stats", StatsStream.class)
.withFunctionName("facet", FacetStream.class)
.withFunctionName("jdbc", JDBCStream.class)
.withFunctionName("intersect", IntersectStream.class)
.withFunctionName("complement", ComplementStream.class)
.withFunctionName("count", CountMetric.class)
.withFunctionName("sum", SumMetric.class)
.withFunctionName("min", MinMetric.class)
.withFunctionName("max", MaxMetric.class)
.withFunctionName("avg", MeanMetric.class)
.withFunctionName("daemon", DaemonStream.class)
.withFunctionName("topic", TopicStream.class)
;
}
@Test
public void testCloudSolrStream() throws Exception {
CloudSolrStream stream;
// Basic test
stream = new CloudSolrStream(StreamExpressionParser.parse("search(collection1, q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_i asc\")"), factory);
Explanation explanation = stream.toExplanation(factory);
Assert.assertEquals("search", explanation.getFunctionName());
Assert.assertEquals(CloudSolrStream.class.getName(), explanation.getImplementingClass());
}
@Test
public void testSelectStream() throws Exception {
SelectStream stream;
// Basic test
stream = new SelectStream(StreamExpressionParser.parse("select(\"a_s as fieldA\", search(collection1, q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_i asc\"))"), factory);
Explanation explanation = stream.toExplanation(factory);
Assert.assertEquals("select", explanation.getFunctionName());
Assert.assertEquals(SelectStream.class.getName(), explanation.getImplementingClass());
}
@Test
public void testDaemonStream() throws Exception {
DaemonStream stream;
// Basic test
stream = new DaemonStream(StreamExpressionParser.parse("daemon(search(collection1, q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_i asc\"), id=\"blah\", runInterval=\"1000\", queueSize=\"100\")"), factory);
Explanation explanation = stream.toExplanation(factory);
Assert.assertEquals("daemon", explanation.getFunctionName());
Assert.assertEquals(DaemonStream.class.getName(), explanation.getImplementingClass());
}
@Test
public void testTopicStream() throws Exception {
TopicStream stream;
// Basic test
stream = new TopicStream(StreamExpressionParser.parse("topic(collection2, collection1, q=*:*, fl=\"id,a_s,a_i,a_f\", id=\"blah\", checkpointEvery=1000)"), factory);
Explanation explanation = stream.toExplanation(factory);
Assert.assertEquals("topic", explanation.getFunctionName());
Assert.assertEquals(TopicStream.class.getName(), explanation.getImplementingClass());
}
@Test
public void testStatsStream() throws Exception {
StatsStream stream;
// Basic test
stream = new StatsStream(StreamExpressionParser.parse("stats(collection1, q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_i asc\", sum(a_i), avg(a_i), count(*), min(a_i), max(a_i))"), factory);
Explanation explanation = stream.toExplanation(factory);
Assert.assertEquals("stats", explanation.getFunctionName());
Assert.assertEquals(StatsStream.class.getName(), explanation.getImplementingClass());
}
@Test
public void testUniqueStream() throws Exception {
UniqueStream stream;
// Basic test
stream = new UniqueStream(StreamExpressionParser.parse("unique(search(collection1, q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_i asc\"), over=\"a_f\")"), factory);
Explanation explanation = stream.toExplanation(factory);
Assert.assertEquals("unique", explanation.getFunctionName());
Assert.assertEquals(UniqueStream.class.getName(), explanation.getImplementingClass());
}
@Test
public void testMergeStream() throws Exception {
MergeStream stream;
// Basic test
stream = new MergeStream(StreamExpressionParser.parse("merge("
+ "search(collection1, q=\"id:(0 3 4)\", fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_s asc\"),"
+ "search(collection1, q=\"id:(1 2)\", fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_s asc\"),"
+ "on=\"a_f asc, a_s asc\")"), factory);
Explanation explanation = stream.toExplanation(factory);
Assert.assertEquals("merge", explanation.getFunctionName());
Assert.assertEquals(MergeStream.class.getName(), explanation.getImplementingClass());
Assert.assertEquals(2, ((StreamExplanation)explanation).getChildren().size());
}
@Test
public void testRankStream() throws Exception {
RankStream stream;
String expressionString;
// Basic test
stream = new RankStream(StreamExpressionParser.parse("top("
+ "n=3,"
+ "search(collection1, q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc,a_i asc\"),"
+ "sort=\"a_f asc, a_i asc\")"), factory);
Explanation explanation = stream.toExplanation(factory);
Assert.assertEquals("top", explanation.getFunctionName());
Assert.assertEquals(RankStream.class.getName(), explanation.getImplementingClass());
Assert.assertEquals(1, ((StreamExplanation)explanation).getChildren().size());
}
@Test
public void testReducerStream() throws Exception {
ReducerStream stream;
String expressionString;
// Basic test
stream = new ReducerStream(StreamExpressionParser.parse("reduce("
+ "search(collection1, q=*:*, fl=\"id,a_s,a_i,a_f\", sort=\"a_s desc, a_f asc\"),"
+ "by=\"a_s\", group(sort=\"a_i desc\", n=\"5\"))"), factory);
Explanation explanation = stream.toExplanation(factory);
Assert.assertEquals("reduce", explanation.getFunctionName());
Assert.assertEquals(ReducerStream.class.getName(), explanation.getImplementingClass());
Assert.assertEquals(1, ((StreamExplanation)explanation).getChildren().size());
}
@Test
public void testUpdateStream() throws Exception {
StreamExpression expression = StreamExpressionParser.parse("update("
+ "collection2, "
+ "batchSize=5, "
+ "search("
+ "collection1, "
+ "q=*:*, "
+ "fl=\"id,a_s,a_i,a_f\", "
+ "sort=\"a_f asc, a_i asc\"))");
UpdateStream updateStream = new UpdateStream(expression, factory);
Explanation explanation = updateStream.toExplanation(factory);
Assert.assertEquals("solr (collection2)", explanation.getFunctionName());
Assert.assertEquals("Solr/Lucene", explanation.getImplementingClass());
StreamExplanation updateExplanation = (StreamExplanation)explanation;
Assert.assertEquals(1, updateExplanation.getChildren().size());
Assert.assertEquals("update", updateExplanation.getChildren().get(0).getFunctionName());
Assert.assertEquals(UpdateStream.class.getName(), updateExplanation.getChildren().get(0).getImplementingClass());
}
@Test
public void testFacetStream() throws Exception {
FacetStream stream;
String expressionString;
// Basic test
stream = new FacetStream(StreamExpressionParser.parse("facet("
+ "collection1, "
+ "q=\"*:*\", "
+ "buckets=\"a_s\", "
+ "bucketSorts=\"sum(a_i) asc\", "
+ "bucketSizeLimit=100, "
+ "sum(a_i), sum(a_f), "
+ "min(a_i), min(a_f), "
+ "max(a_i), max(a_f), "
+ "avg(a_i), avg(a_f), "
+ "count(*)"
+ ")"), factory);
expressionString = stream.toExpression(factory).toString();
Explanation explanation = stream.toExplanation(factory);
Assert.assertEquals("facet", explanation.getFunctionName());
Assert.assertEquals(FacetStream.class.getName(), explanation.getImplementingClass());
Assert.assertEquals(1, ((StreamExplanation)explanation).getChildren().size());
}
@Test
public void testJDBCStream() throws Exception {
JDBCStream stream;
String expressionString;
// Basic test
stream = new JDBCStream(StreamExpressionParser.parse("jdbc(connection=\"jdbc:hsqldb:mem:.\", sql=\"select PEOPLE.ID, PEOPLE.NAME, COUNTRIES.COUNTRY_NAME from PEOPLE inner join COUNTRIES on PEOPLE.COUNTRY_CODE = COUNTRIES.CODE order by PEOPLE.ID\", sort=\"ID asc\")"), factory);
Explanation explanation = stream.toExplanation(factory);
Assert.assertEquals("jdbc", explanation.getFunctionName());
Assert.assertEquals(JDBCStream.class.getName(), explanation.getImplementingClass());
Assert.assertEquals(1, ((StreamExplanation)explanation).getChildren().size());
}
@Test
public void testIntersectStream() throws Exception {
IntersectStream stream;
String expressionString;
// Basic test
stream = new IntersectStream(StreamExpressionParser.parse("intersect("
+ "search(collection1, q=\"id:(0 3 4)\", fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_s asc\"),"
+ "search(collection1, q=\"id:(1 2)\", fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_s asc\"),"
+ "on=\"a_f, a_s\")"), factory);
Explanation explanation = stream.toExplanation(factory);
Assert.assertEquals("intersect", explanation.getFunctionName());
Assert.assertEquals(IntersectStream.class.getName(), explanation.getImplementingClass());
Assert.assertEquals(2, ((StreamExplanation)explanation).getChildren().size());
}
@Test
public void testComplementStream() throws Exception {
ComplementStream stream;
String expressionString;
// Basic test
stream = new ComplementStream(StreamExpressionParser.parse("complement("
+ "search(collection1, q=\"id:(0 3 4)\", fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_s asc\"),"
+ "search(collection1, q=\"id:(1 2)\", fl=\"id,a_s,a_i,a_f\", sort=\"a_f asc, a_s asc\"),"
+ "on=\"a_f, a_s\")"), factory);
Explanation explanation = stream.toExplanation(factory);
Assert.assertEquals("complement", explanation.getFunctionName());
Assert.assertEquals(ComplementStream.class.getName(), explanation.getImplementingClass());
Assert.assertEquals(2, ((StreamExplanation)explanation).getChildren().size());
}
}

View File

@ -275,6 +275,7 @@ limitations under the License.
.sub-menu .overview a { background-image: url( ../../img/ico/home.png ); }
.sub-menu .query a { background-image: url( ../../img/ico/magnifier.png ); }
.sub-menu .stream a { background-image: url( ../../img/ico/node.png ); }
.sub-menu .analysis a { background-image: url( ../../img/ico/funnel.png ); }
.sub-menu .documents a { background-image: url( ../../img/ico/documents-stack.png ); }
.sub-menu .files a { background-image: url( ../../img/ico/folder.png ); }

View File

@ -0,0 +1,233 @@
/*
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
#content #stream
{
}
#content #stream #form
{
float: top;
}
#content #stream #form label
{
cursor: pointer;
display: inline;
margin-top: 5px;
}
#content #stream #form input,
#content #stream #form select,
#content #stream #form textarea
{
margin-bottom: 2px;
width: 100%;
}
#content #stream #form textarea
{
height: 125px;
}
#content #stream #form #start
{
float: left;
width: 45%;
}
#content #stream #form #rows
{
float: right;
width: 45%;
}
#content #stream #form input[type=checkbox]
{
margin-bottom: 0;
margin-left: 5px;
margin-right: 0px;
width: 10px;
height: 10px;
display: inline;
}
#content #stream #form fieldset
{
border: 1px solid #fff;
border-top: 1px solid #c0c0c0;
margin-bottom: 5px;
}
#content #stream #form fieldset.common
{
margin-top: 10px;
}
#content #stream #form fieldset legend
{
display: block;
margin-left: 10px;
padding: 0px 5px;
}
#content #stream #form fieldset legend label
{
margin-top: 0;
}
#content #stream #form button
{
margin-right: 10px;
}
#content #stream #form fieldset .fieldset
{
border-bottom: 1px solid #f0f0f0;
margin-bottom: 5px;
padding-bottom: 10px;
}
#content #stream #result
{
float: bottom;
}
#content #stream #result #response
{
}
/************************/
#content #stream #result #explanation
{
border-top: 1px solid #f0f0f0;
border-bottom: 1px solid #f0f0f0;
border-left: 1px solid #f0f0f0;
border-right: 1px solid #f0f0f0;
}
#content #stream #result #explanation .loader
{
background-position: 0 50%;
padding-left: 21px;
}
#content #stream #result #explanation #error
{
background-color: #f00;
background-image: url( ../../img/ico/construction.png );
background-position: 10px 12px;
color: #fff;
font-weight: bold;
margin-bottom: 20px;
padding: 10px;
padding-left: 35px;
}
#content #stream #result #explanation #error .msg
{
font-style: italic;
font-weight: normal;
margin-top: 10px;
}
#content #stream #result #explanation .content
{
padding-left: 0;
padding-right: 0;
}
#content #stream #result #explanation .content.show
{
background-image: url( ../../img/div.gif );
background-repeat: repeat-y;
background-position: 31% 0;
}
#content #stream #result #explanation #legend
{
border: 1px solid #f0f0f0;
padding: 10px;
/*position: absolute;
right: 0;
bottom: 0;*/
}
#content #stream #result #explanation #legend li
{
padding-left: 15px;
position: relative;
-webkit-box-sizing: border-box;
}
#content #stream #result #explanation #legend li svg
{
position: absolute;
left: 0;
top: 2px;
}
#content #stream #result #explanation #graph-content
{
min-height: 50px;
width: 100%
}
#content #stream #result #explanation #graph-content .node circle
{
color: #c48f00;
stroke: #c48f00;
fill: #c48f00;
}
#content #stream #result #explanation #graph-content .link
{
fill: none;
stroke: #e0e0e0;
stroke-width: 1.5px;
}
#content #stream #result #explanation #legend .datastore circle,
#content #stream #result #explanation #graph-content .node.datastore circle
{
stroke: #3800c4;
fill: #3800c4;
}
#content #stream #result #explanation #legend .stream-source circle,
#content #stream #result #explanation #graph-content .node.stream-source circle
{
stroke: #21a9ec;
fill: #21a9ec;
}
#content #stream #result #explanation #legend .stream-decorator circle,
#content #stream #result #explanation #graph-content .node.stream-decorator circle
{
stroke: #cb21ec;
fill: #cb21ec;
}
#content #stream #result #explanation #legend .graph-source circle,
#content #stream #result #explanation #graph-content .node.graph-source circle
{
stroke: #21eca9;
fill: #21eca9;
}

View File

@ -38,6 +38,7 @@ limitations under the License.
<link rel="stylesheet" type="text/css" href="css/angular/plugins.css?_=${version}">
<link rel="stylesheet" type="text/css" href="css/angular/documents.css?_=${version}">
<link rel="stylesheet" type="text/css" href="css/angular/query.css?_=${version}">
<link rel="stylesheet" type="text/css" href="css/angular/stream.css?_=${version}">
<link rel="stylesheet" type="text/css" href="css/angular/replication.css?_=${version}">
<link rel="stylesheet" type="text/css" href="css/angular/schema.css?_=${version}">
<link rel="stylesheet" type="text/css" href="css/angular/segments.css?_=${version}">
@ -72,6 +73,7 @@ limitations under the License.
<script src="js/angular/controllers/documents.js"></script>
<script src="js/angular/controllers/files.js"></script>
<script src="js/angular/controllers/query.js"></script>
<script src="js/angular/controllers/stream.js"></script>
<script src="js/angular/controllers/plugins.js"></script>
<script src="js/angular/controllers/replication.js"></script>
<script src="js/angular/controllers/schema.js"></script>
@ -183,6 +185,7 @@ limitations under the License.
<li class="documents" ng-class="{active:page=='documents'}"><a href="#/{{currentCollection.name}}/documents"><span>Documents</span></a></li>
<li class="files" ng-class="{active:page=='files'}"><a href="#/{{currentCollection.name}}/files"><span>Files</span></a></li>
<li class="query" ng-class="{active:page=='query'}"><a href="#/{{currentCollection.name}}/query"><span>Query</span></a></li>
<li class="stream" ng-class="{active:page=='stream'}"><a href="#/{{currentCollection.name}}/stream"><span>Stream</span></a></li>
<li class="schema" ng-class="{active:page=='schema'}"><a href="#/{{currentCollection.name}}/schema"><span>Schema</span></a></li>
</ul>
</div>

View File

@ -109,6 +109,10 @@ solrAdminApp.config([
templateUrl: 'partials/query.html',
controller: 'QueryController'
}).
when('/:core/stream', {
templateUrl: 'partials/stream.html',
controller: 'StreamController'
}).
when('/:core/replication', {
templateUrl: 'partials/replication.html',
controller: 'ReplicationController'

View File

@ -0,0 +1,251 @@
/*
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
solrAdminApp.controller('StreamController',
function($scope, $routeParams, $location, Query, Constants) {
$scope.resetMenu("stream", Constants.IS_COLLECTION_PAGE);
$scope.stream = {
wt: 'json',
expr: $scope.expr,
indent: 'on'
};
$scope.qt = "stream";
$scope.doExplanation = false
$scope.doStream = function() {
// alert("doing stream")
var params = {};
params.core = $routeParams.core;
params.handler = $scope.qt;
params.expr = [$scope.expr]
if($scope.doExplanation){
params.explain = [$scope.doExplanation]
}
$scope.lang = "json";
$scope.response = null;
$scope.url = "";
var url = Query.url(params);
Query.query(params, function(data) {
var jsonData = JSON.parse(data.toJSON().data);
if (undefined != jsonData["explanation"]) {
$scope.showExplanation = true;
graphSubController($scope, jsonData["explanation"])
delete jsonData["explanation"]
} else {
$scope.showExplanation = false;
}
data.data = JSON.stringify(jsonData,null,2);
$scope.lang = "json";
$scope.response = data;
$scope.url = $location.protocol() + "://" +
$location.host() + ":" +
$location.port() + url;
});
};
if ($location.search().expr) {
$scope.expr = $location.search()["expr"];
$scope.doStream();
}
}
);
var graphSubController = function($scope, explanation) {
$scope.showGraph = true;
$scope.pos = 0;
$scope.rows = 8;
$scope.helperData = {
protocol: [],
host: [],
hostname: [],
port: [],
pathname: []
};
$scope.resetGraph = function() {
$scope.pos = 0;
$scope.initGraph();
}
$scope.initGraph = function(explanation) {
data = explanation
var leafCount = 0;
var maxDepth = 0;
var rootNode = {};
leafCount = 0;
let recurse = function(dataNode, depth) {
if (depth > maxDepth) {
maxDepth = depth;
}
let graphNode = {
name: dataNode.expressionNodeId,
implementingClass: 'unknown',
data: {}
};
["expressionNodeId", "expressionType", "functionName", "implementingClass", "expression", "note", "helpers"].forEach(function(key) {
graphNode.data[key] = dataNode[key];
});
if (dataNode.children && dataNode.children.length > 0) {
graphNode.children = [];
dataNode.children.forEach(function(n) {
graphNode.children.push(recurse(n, depth + 1));
});
} else {
++leafCount;
}
return graphNode;
}
$scope.showPaging = false;
$scope.isRadial = false;
$scope.graphData = recurse(data, 1);
$scope.depth = maxDepth + 1;
$scope.leafCount = leafCount;
};
$scope.initGraph(explanation);
};
solrAdminApp.directive('foograph', function(Constants) {
return {
restrict: 'EA',
scope: {
data: "=",
leafCount: "=",
depth: "=",
helperData: "=",
isRadial: "="
},
link: function(scope, element, attrs) {
var helper_path_class = function(p) {
var classes = ['link'];
return classes.join(' ');
};
var helper_node_class = function(d) {
var classes = ['node'];
if (d.data && d.data.expressionType) {
classes.push(d.data.expressionType);
}
return classes.join(' ');
};
var helper_node_text = function(d) {
if (d.data && d.data.functionName) {
return d.data.functionName;
}
return d.name
};
var helper_tooltip = function(d) {
return [
"Function: " + d.data.functionName,
"Type: " + d.data.expressionType,
"Class: " + d.data.implementingClass.replace("org.apache.solr.client.solrj.io", "o.a.s.c.s.i"),
"=============",
d.data.expression
].join("\n");
}
scope.$watch("data", function(newValue, oldValue) {
if (newValue) {
flatGraph(element, scope.data, scope.depth, scope.leafCount);
}
});
var flatGraph = function(element, graphData, depth, leafCount) {
var w = 100 + (depth * 100),
h = leafCount * 40;
var tree = d3.layout.tree().size([h, w]);
var diagonal = d3.svg.diagonal().projection(function(d) {
return [d.y * .7, d.x];
});
d3.select('#canvas', element).html('');
var vis = d3.select('#canvas', element).append('svg')
.attr('width', w)
.attr('height', h)
.append('g')
.attr('transform', 'translate(25, 0)');
var nodes = tree.nodes(graphData);
var link = vis.selectAll('path.link')
.data(tree.links(nodes))
.enter().append('path')
.attr('class', helper_path_class)
.attr('d', diagonal);
var node = vis.selectAll('g.node')
.data(nodes)
.enter().append('g')
.attr('class', helper_node_class)
.attr('transform', function(d) {
return 'translate(' + d.y * .7 + ',' + d.x + ')';
})
node.append('circle')
.attr('r', 4.5);
node.append('title')
.text(helper_tooltip);
node.append('text')
.attr('dx', function(d) {
return 8;
})
.attr('dy', function(d) {
return 5;
})
.attr('text-anchor', function(d) {
return 'start';
})
.text(helper_node_text)
};
}
};
})

View File

@ -0,0 +1,64 @@
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<div id="stream" class="clearfix">
<div id="form">
<form>
<label for="expr" title="The expression string">
Streaming Expression (expr)
</label>
<textarea name="expr" ng-model="expr" id="expr" title="The expression string.">search(....)</textarea>
<button type="submit" ng-click="doStream()">Execute</button>
<input type="checkbox" ng-model="doExplanation" name="doExplanation" id="doExplanation" value="true">
<label for="explain" class="checkbox" title="Enable Explanation." ng-click="doExplanation = !doExplanation">
with explanation
</label>
</form>
</div>
<div id="result">
<a ng-show="response.data" id="url" class="address-bar" ng-href="{{url}}">{{url}}</a>
<div ng-show="showExplanation" id="explanation" class="clearfix">
<div id="frame">
<div foograph id="graph-content" data="graphData" depth="depth" leaf-count="leafCount" helper-data="helperData" is-radial="false" class="content clearfix" ng-show="showGraph">
<div id="legend">
<svg width="100%" height="15">
<g transform="translate(5,10)" class="stream-decorator"><circle r="4.5"></circle></g>
<g transform="translate(15,14)"><text>Stream Decorator</text></g>
<g transform="translate(140,10)" class="stream-source"><circle r="4.5"></circle></g>
<g transform="translate(150,14)"><text>Stream Source</text></g>
<g transform="translate(260,10)" class="graph-source"><circle r="4.5"></circle></g>
<g transform="translate(270,14)"><text>Graph Source</text></g>
<g transform="translate(375,10)" class="datastore"><circle r="4.5"></circle></g>
<g transform="translate(385,14)"><text>Datastore</text></g>
</svg>
</div>
<div id="canvas"></div>
</div>
</div>
</div>
<div id="response">
<pre class="syntax language-json"><code ng-bind-html="response.data | highlight:json | unsafe"></code></pre>
</div>
</div>
</div>