SOLR-10882: ArrayEvaluator now works with all types and allows sorts (deleted ArraySortEvaluator)

This commit is contained in:
Dennis Gove 2017-06-15 22:10:37 -04:00
parent 5fca6a4d82
commit 113459a840
5 changed files with 444 additions and 317 deletions

View File

@ -77,7 +77,7 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware,
private StreamFactory streamFactory = new StreamFactory();
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private String coreName;
private Map<String, DaemonStream> daemons = Collections.synchronizedMap(new HashMap());
private Map<String,DaemonStream> daemons = Collections.synchronizedMap(new HashMap());
@Override
public PermissionNameProvider.Name getPermissionName(AuthorizationContext request) {
@ -89,202 +89,202 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware,
}
public void inform(SolrCore core) {
/* The stream factory will always contain the zkUrl for the given collection
* Adds default streams with their corresponding function names. These
* defaults can be overridden or added to in the solrConfig in the stream
* RequestHandler def. Example config override
* <lst name="streamFunctions">
* <str name="group">org.apache.solr.client.solrj.io.stream.ReducerStream</str>
* <str name="count">org.apache.solr.client.solrj.io.stream.RecordCountStream</str>
* </lst>
* */
/*
* The stream factory will always contain the zkUrl for the given collection Adds default streams with their
* corresponding function names. These defaults can be overridden or added to in the solrConfig in the stream
* RequestHandler def. Example config override
* <lst name="streamFunctions">
* <str name="group">org.apache.solr.client.solrj.io.stream.ReducerStream</str>
* <str name="count">org.apache.solr.client.solrj.io.stream.RecordCountStream</str>
* </lst>
*/
String defaultCollection;
String defaultZkhost;
CoreContainer coreContainer = core.getCoreContainer();
this.coreName = core.getName();
if(coreContainer.isZooKeeperAware()) {
if (coreContainer.isZooKeeperAware()) {
defaultCollection = core.getCoreDescriptor().getCollectionName();
defaultZkhost = core.getCoreContainer().getZkController().getZkServerAddress();
streamFactory.withCollectionZkHost(defaultCollection, defaultZkhost);
streamFactory.withDefaultZkHost(defaultZkhost);
modelCache = new ModelCache(250,
defaultZkhost,
clientCache);
defaultZkhost,
clientCache);
}
streamFactory
// source streams
.withFunctionName("search", CloudSolrStream.class)
.withFunctionName("facet", FacetStream.class)
.withFunctionName("update", UpdateStream.class)
.withFunctionName("jdbc", JDBCStream.class)
.withFunctionName("topic", TopicStream.class)
.withFunctionName("commit", CommitStream.class)
.withFunctionName("random", RandomStream.class)
.withFunctionName("knn", KnnStream.class)
// decorator streams
.withFunctionName("merge", MergeStream.class)
.withFunctionName("unique", UniqueStream.class)
.withFunctionName("top", RankStream.class)
.withFunctionName("group", GroupOperation.class)
.withFunctionName("reduce", ReducerStream.class)
.withFunctionName("parallel", ParallelStream.class)
.withFunctionName("rollup", RollupStream.class)
.withFunctionName("stats", StatsStream.class)
.withFunctionName("innerJoin", InnerJoinStream.class)
.withFunctionName("leftOuterJoin", LeftOuterJoinStream.class)
.withFunctionName("hashJoin", HashJoinStream.class)
.withFunctionName("outerHashJoin", OuterHashJoinStream.class)
.withFunctionName("intersect", IntersectStream.class)
.withFunctionName("complement", ComplementStream.class)
.withFunctionName(SORT, SortStream.class)
.withFunctionName("train", TextLogitStream.class)
.withFunctionName("features", FeaturesSelectionStream.class)
.withFunctionName("daemon", DaemonStream.class)
.withFunctionName("shortestPath", ShortestPathStream.class)
.withFunctionName("gatherNodes", GatherNodesStream.class)
.withFunctionName("nodes", GatherNodesStream.class)
.withFunctionName("select", SelectStream.class)
.withFunctionName("shortestPath", ShortestPathStream.class)
.withFunctionName("gatherNodes", GatherNodesStream.class)
.withFunctionName("nodes", GatherNodesStream.class)
.withFunctionName("scoreNodes", ScoreNodesStream.class)
.withFunctionName("model", ModelStream.class)
.withFunctionName("classify", ClassifyStream.class)
.withFunctionName("fetch", FetchStream.class)
.withFunctionName("executor", ExecutorStream.class)
.withFunctionName("null", NullStream.class)
.withFunctionName("priority", PriorityStream.class)
.withFunctionName("significantTerms", SignificantTermsStream.class)
.withFunctionName("cartesianProduct", CartesianProductStream.class)
.withFunctionName("shuffle", ShuffleStream.class)
.withFunctionName("calc", CalculatorStream.class)
.withFunctionName("eval",EvalStream.class)
.withFunctionName("echo", EchoStream.class)
.withFunctionName("cell", CellStream.class)
.withFunctionName("list", ListStream.class)
.withFunctionName("let", LetStream.class)
.withFunctionName("get", GetStream.class)
.withFunctionName("timeseries", TimeSeriesStream.class)
.withFunctionName("tuple", TupStream.class)
.withFunctionName("sql", SqlStream.class)
.withFunctionName("col", ColumnEvaluator.class)
.withFunctionName("predict", PredictEvaluator.class)
.withFunctionName("regress", RegressionEvaluator.class)
.withFunctionName("cov", CovarianceEvaluator.class)
.withFunctionName("conv", ConvolutionEvaluator.class)
.withFunctionName("normalize", NormalizeEvaluator.class)
.withFunctionName("rev", ReverseEvaluator.class)
.withFunctionName("length", LengthEvaluator.class)
.withFunctionName("rank", RankEvaluator.class)
.withFunctionName("scale", ScaleEvaluator.class)
.withFunctionName("distance", DistanceEvaluator.class)
.withFunctionName("copyOf", CopyOfEvaluator.class)
.withFunctionName("copyOfRange", CopyOfRangeEvaluator.class)
.withFunctionName("percentile", PercentileEvaluator.class)
.withFunctionName("empiricalDistribution", EmpiricalDistributionEvaluator.class)
.withFunctionName("cumulativeProbability", CumulativeProbabilityEvaluator.class)
.withFunctionName("describe", DescribeEvaluator.class)
.withFunctionName("finddelay", FindDelayEvaluator.class)
.withFunctionName("sequence", SequenceEvaluator.class)
.withFunctionName("array", ArrayEvaluator.class)
.withFunctionName("hist", HistogramEvaluator.class)
.withFunctionName("anova", AnovaEvaluator.class)
.withFunctionName("movingAvg", MovingAverageEvaluator.class)
.withFunctionName("arraySort", ArraySortEvaluator.class)
streamFactory
// source streams
.withFunctionName("search", CloudSolrStream.class)
.withFunctionName("facet", FacetStream.class)
.withFunctionName("update", UpdateStream.class)
.withFunctionName("jdbc", JDBCStream.class)
.withFunctionName("topic", TopicStream.class)
.withFunctionName("commit", CommitStream.class)
.withFunctionName("random", RandomStream.class)
.withFunctionName("knn", KnnStream.class)
// metrics
.withFunctionName("min", MinMetric.class)
.withFunctionName("max", MaxMetric.class)
.withFunctionName("avg", MeanMetric.class)
.withFunctionName("sum", SumMetric.class)
.withFunctionName("count", CountMetric.class)
// tuple manipulation operations
.withFunctionName("replace", ReplaceOperation.class)
.withFunctionName("concat", ConcatOperation.class)
// stream reduction operations
.withFunctionName("group", GroupOperation.class)
.withFunctionName("distinct", DistinctOperation.class)
.withFunctionName("having", HavingStream.class)
// Stream Evaluators
.withFunctionName("val", RawValueEvaluator.class)
// Boolean Stream Evaluators
.withFunctionName("and", AndEvaluator.class)
.withFunctionName("eor", ExclusiveOrEvaluator.class)
.withFunctionName("eq", EqualsEvaluator.class)
.withFunctionName("gt", GreaterThanEvaluator.class)
.withFunctionName("gteq", GreaterThanEqualToEvaluator.class)
.withFunctionName("lt", LessThanEvaluator.class)
.withFunctionName("lteq", LessThanEqualToEvaluator.class)
.withFunctionName("not", NotEvaluator.class)
.withFunctionName("or", OrEvaluator.class)
// decorator streams
.withFunctionName("merge", MergeStream.class)
.withFunctionName("unique", UniqueStream.class)
.withFunctionName("top", RankStream.class)
.withFunctionName("group", GroupOperation.class)
.withFunctionName("reduce", ReducerStream.class)
.withFunctionName("parallel", ParallelStream.class)
.withFunctionName("rollup", RollupStream.class)
.withFunctionName("stats", StatsStream.class)
.withFunctionName("innerJoin", InnerJoinStream.class)
.withFunctionName("leftOuterJoin", LeftOuterJoinStream.class)
.withFunctionName("hashJoin", HashJoinStream.class)
.withFunctionName("outerHashJoin", OuterHashJoinStream.class)
.withFunctionName("intersect", IntersectStream.class)
.withFunctionName("complement", ComplementStream.class)
.withFunctionName(SORT, SortStream.class)
.withFunctionName("train", TextLogitStream.class)
.withFunctionName("features", FeaturesSelectionStream.class)
.withFunctionName("daemon", DaemonStream.class)
.withFunctionName("shortestPath", ShortestPathStream.class)
.withFunctionName("gatherNodes", GatherNodesStream.class)
.withFunctionName("nodes", GatherNodesStream.class)
.withFunctionName("select", SelectStream.class)
.withFunctionName("shortestPath", ShortestPathStream.class)
.withFunctionName("gatherNodes", GatherNodesStream.class)
.withFunctionName("nodes", GatherNodesStream.class)
.withFunctionName("scoreNodes", ScoreNodesStream.class)
.withFunctionName("model", ModelStream.class)
.withFunctionName("classify", ClassifyStream.class)
.withFunctionName("fetch", FetchStream.class)
.withFunctionName("executor", ExecutorStream.class)
.withFunctionName("null", NullStream.class)
.withFunctionName("priority", PriorityStream.class)
.withFunctionName("significantTerms", SignificantTermsStream.class)
.withFunctionName("cartesianProduct", CartesianProductStream.class)
.withFunctionName("shuffle", ShuffleStream.class)
.withFunctionName("calc", CalculatorStream.class)
.withFunctionName("eval", EvalStream.class)
.withFunctionName("echo", EchoStream.class)
.withFunctionName("cell", CellStream.class)
.withFunctionName("list", ListStream.class)
.withFunctionName("let", LetStream.class)
.withFunctionName("get", GetStream.class)
.withFunctionName("timeseries", TimeSeriesStream.class)
.withFunctionName("tuple", TupStream.class)
.withFunctionName("sql", SqlStream.class)
// Date Time Evaluators
.withFunctionName(TemporalEvaluatorYear.FUNCTION_NAME, TemporalEvaluatorYear.class)
.withFunctionName(TemporalEvaluatorMonth.FUNCTION_NAME, TemporalEvaluatorMonth.class)
.withFunctionName(TemporalEvaluatorDay.FUNCTION_NAME, TemporalEvaluatorDay.class)
.withFunctionName(TemporalEvaluatorDayOfYear.FUNCTION_NAME, TemporalEvaluatorDayOfYear.class)
.withFunctionName(TemporalEvaluatorHour.FUNCTION_NAME, TemporalEvaluatorHour.class)
.withFunctionName(TemporalEvaluatorMinute.FUNCTION_NAME, TemporalEvaluatorMinute.class)
.withFunctionName(TemporalEvaluatorSecond.FUNCTION_NAME, TemporalEvaluatorSecond.class)
.withFunctionName(TemporalEvaluatorEpoch.FUNCTION_NAME, TemporalEvaluatorEpoch.class)
.withFunctionName(TemporalEvaluatorWeek.FUNCTION_NAME, TemporalEvaluatorWeek.class)
.withFunctionName(TemporalEvaluatorQuarter.FUNCTION_NAME, TemporalEvaluatorQuarter.class)
.withFunctionName(TemporalEvaluatorDayOfQuarter.FUNCTION_NAME, TemporalEvaluatorDayOfQuarter.class)
// metrics
.withFunctionName("min", MinMetric.class)
.withFunctionName("max", MaxMetric.class)
.withFunctionName("avg", MeanMetric.class)
.withFunctionName("sum", SumMetric.class)
.withFunctionName("count", CountMetric.class)
// Number Stream Evaluators
.withFunctionName("abs", AbsoluteValueEvaluator.class)
.withFunctionName("add", AddEvaluator.class)
.withFunctionName("div", DivideEvaluator.class)
.withFunctionName("mult", MultiplyEvaluator.class)
.withFunctionName("sub", SubtractEvaluator.class)
.withFunctionName("log", NaturalLogEvaluator.class)
.withFunctionName("pow", PowerEvaluator.class)
.withFunctionName("mod", ModuloEvaluator.class)
.withFunctionName("ceil", CeilingEvaluator.class)
.withFunctionName("floor", FloorEvaluator.class)
.withFunctionName("sin", SineEvaluator.class)
.withFunctionName("asin", ArcSineEvaluator.class)
.withFunctionName("sinh", HyperbolicSineEvaluator.class)
.withFunctionName("cos", CosineEvaluator.class)
.withFunctionName("acos", ArcCosineEvaluator.class)
.withFunctionName("cosh", HyperbolicCosineEvaluator.class)
.withFunctionName("tan", TangentEvaluator.class)
.withFunctionName("atan", ArcTangentEvaluator.class)
.withFunctionName("tanh", HyperbolicTangentEvaluator.class)
.withFunctionName("round", RoundEvaluator.class)
.withFunctionName("sqrt", SquareRootEvaluator.class)
.withFunctionName("cbrt", CubedRootEvaluator.class)
.withFunctionName("coalesce", CoalesceEvaluator.class)
.withFunctionName("uuid", UuidEvaluator.class)
.withFunctionName("corr", CorrelationEvaluator.class)
// tuple manipulation operations
.withFunctionName("replace", ReplaceOperation.class)
.withFunctionName("concat", ConcatOperation.class)
// stream reduction operations
.withFunctionName("group", GroupOperation.class)
.withFunctionName("distinct", DistinctOperation.class)
.withFunctionName("having", HavingStream.class)
// Conditional Stream Evaluators
.withFunctionName("if", IfThenElseEvaluator.class)
.withFunctionName("analyze", AnalyzeEvaluator.class)
.withFunctionName("convert", ConversionEvaluator.class)
;
// Stream Evaluators
.withFunctionName("val", RawValueEvaluator.class)
// New Evaluators
.withFunctionName("anova", AnovaEvaluator.class)
.withFunctionName("array", ArrayEvaluator.class)
.withFunctionName("col", ColumnEvaluator.class)
.withFunctionName("conv", ConvolutionEvaluator.class)
.withFunctionName("copyOf", CopyOfEvaluator.class)
.withFunctionName("copyOfRange", CopyOfRangeEvaluator.class)
.withFunctionName("cov", CovarianceEvaluator.class)
.withFunctionName("cumulativeProbability", CumulativeProbabilityEvaluator.class)
.withFunctionName("describe", DescribeEvaluator.class)
.withFunctionName("distance", DistanceEvaluator.class)
.withFunctionName("empiricalDistribution", EmpiricalDistributionEvaluator.class)
.withFunctionName("finddelay", FindDelayEvaluator.class)
.withFunctionName("hist", HistogramEvaluator.class)
.withFunctionName("length", LengthEvaluator.class)
.withFunctionName("movingAvg", MovingAverageEvaluator.class)
.withFunctionName("normalize", NormalizeEvaluator.class)
.withFunctionName("percentile", PercentileEvaluator.class)
.withFunctionName("predict", PredictEvaluator.class)
.withFunctionName("rank", RankEvaluator.class)
.withFunctionName("regress", RegressionEvaluator.class)
.withFunctionName("rev", ReverseEvaluator.class)
.withFunctionName("scale", ScaleEvaluator.class)
.withFunctionName("sequence", SequenceEvaluator.class)
// This pulls all the overrides and additions from the config
List<PluginInfo> pluginInfos = core.getSolrConfig().getPluginInfos(Expressible.class.getName());
for (PluginInfo pluginInfo : pluginInfos) {
Class<? extends Expressible> clazz = core.getMemClassLoader().findClass(pluginInfo.className, Expressible.class);
streamFactory.withFunctionName(pluginInfo.name, clazz);
}
// Boolean Stream Evaluators
.withFunctionName("and", AndEvaluator.class)
.withFunctionName("eor", ExclusiveOrEvaluator.class)
.withFunctionName("eq", EqualsEvaluator.class)
.withFunctionName("gt", GreaterThanEvaluator.class)
.withFunctionName("gteq", GreaterThanEqualToEvaluator.class)
.withFunctionName("lt", LessThanEvaluator.class)
.withFunctionName("lteq", LessThanEqualToEvaluator.class)
.withFunctionName("not", NotEvaluator.class)
.withFunctionName("or", OrEvaluator.class)
// Date Time Evaluators
.withFunctionName(TemporalEvaluatorYear.FUNCTION_NAME, TemporalEvaluatorYear.class)
.withFunctionName(TemporalEvaluatorMonth.FUNCTION_NAME, TemporalEvaluatorMonth.class)
.withFunctionName(TemporalEvaluatorDay.FUNCTION_NAME, TemporalEvaluatorDay.class)
.withFunctionName(TemporalEvaluatorDayOfYear.FUNCTION_NAME, TemporalEvaluatorDayOfYear.class)
.withFunctionName(TemporalEvaluatorHour.FUNCTION_NAME, TemporalEvaluatorHour.class)
.withFunctionName(TemporalEvaluatorMinute.FUNCTION_NAME, TemporalEvaluatorMinute.class)
.withFunctionName(TemporalEvaluatorSecond.FUNCTION_NAME, TemporalEvaluatorSecond.class)
.withFunctionName(TemporalEvaluatorEpoch.FUNCTION_NAME, TemporalEvaluatorEpoch.class)
.withFunctionName(TemporalEvaluatorWeek.FUNCTION_NAME, TemporalEvaluatorWeek.class)
.withFunctionName(TemporalEvaluatorQuarter.FUNCTION_NAME, TemporalEvaluatorQuarter.class)
.withFunctionName(TemporalEvaluatorDayOfQuarter.FUNCTION_NAME, TemporalEvaluatorDayOfQuarter.class)
// Number Stream Evaluators
.withFunctionName("abs", AbsoluteValueEvaluator.class)
.withFunctionName("add", AddEvaluator.class)
.withFunctionName("div", DivideEvaluator.class)
.withFunctionName("mult", MultiplyEvaluator.class)
.withFunctionName("sub", SubtractEvaluator.class)
.withFunctionName("log", NaturalLogEvaluator.class)
.withFunctionName("pow", PowerEvaluator.class)
.withFunctionName("mod", ModuloEvaluator.class)
.withFunctionName("ceil", CeilingEvaluator.class)
.withFunctionName("floor", FloorEvaluator.class)
.withFunctionName("sin", SineEvaluator.class)
.withFunctionName("asin", ArcSineEvaluator.class)
.withFunctionName("sinh", HyperbolicSineEvaluator.class)
.withFunctionName("cos", CosineEvaluator.class)
.withFunctionName("acos", ArcCosineEvaluator.class)
.withFunctionName("cosh", HyperbolicCosineEvaluator.class)
.withFunctionName("tan", TangentEvaluator.class)
.withFunctionName("atan", ArcTangentEvaluator.class)
.withFunctionName("tanh", HyperbolicTangentEvaluator.class)
.withFunctionName("round", RoundEvaluator.class)
.withFunctionName("sqrt", SquareRootEvaluator.class)
.withFunctionName("cbrt", CubedRootEvaluator.class)
.withFunctionName("coalesce", CoalesceEvaluator.class)
.withFunctionName("uuid", UuidEvaluator.class)
.withFunctionName("corr", CorrelationEvaluator.class)
// Conditional Stream Evaluators
.withFunctionName("if", IfThenElseEvaluator.class)
.withFunctionName("analyze", AnalyzeEvaluator.class)
.withFunctionName("convert", ConversionEvaluator.class);
// This pulls all the overrides and additions from the config
List<PluginInfo> pluginInfos = core.getSolrConfig().getPluginInfos(Expressible.class.getName());
for (PluginInfo pluginInfo : pluginInfos) {
Class<? extends Expressible> clazz = core.getMemClassLoader().findClass(pluginInfo.className, Expressible.class);
streamFactory.withFunctionName(pluginInfo.name, clazz);
}
core.addCloseHook(new CloseHook() {
@Override
public void preClose(SolrCore core) {
//To change body of implemented methods use File | Settings | File Templates.
// To change body of implemented methods use File | Settings | File Templates.
}
@Override
@ -299,7 +299,7 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware,
params = adjustParams(params);
req.setParams(params);
if(params.get("action") != null) {
if (params.get("action") != null) {
handleAdmin(req, rsp, params);
return;
}
@ -308,7 +308,7 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware,
try {
StreamExpression streamExpression = StreamExpressionParser.parse(params.get("expr"));
if(this.streamFactory.isEvaluator(streamExpression)) {
if (this.streamFactory.isEvaluator(streamExpression)) {
StreamExpression tupleExpression = new StreamExpression("tuple");
tupleExpression.addParameter(new StreamExpressionNamedParameter("return-value", streamExpression));
tupleStream = this.streamFactory.constructStream(tupleExpression);
@ -316,7 +316,8 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware,
tupleStream = this.streamFactory.constructStream(streamExpression);
}
} catch (Exception e) {
//Catch exceptions that occur while the stream is being created. This will include streaming expression parse rules.
// Catch exceptions that occur while the stream is being created. This will include streaming expression parse
// rules.
SolrException.log(logger, e);
rsp.add("result-set", new DummyErrorStream(e));
@ -334,21 +335,21 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware,
context.put("core", this.coreName);
context.put("solr-core", req.getCore());
tupleStream.setStreamContext(context);
// if asking for explanation then go get it
if(params.getBool("explain", false)){
if (params.getBool("explain", false)) {
rsp.add("explanation", tupleStream.toExplanation(this.streamFactory));
}
if(tupleStream instanceof DaemonStream) {
DaemonStream daemonStream = (DaemonStream)tupleStream;
if(daemons.containsKey(daemonStream.getId())) {
if (tupleStream instanceof DaemonStream) {
DaemonStream daemonStream = (DaemonStream) tupleStream;
if (daemons.containsKey(daemonStream.getId())) {
daemons.remove(daemonStream.getId()).close();
}
daemonStream.setDaemons(daemons);
daemonStream.open(); //This will start the deamonStream
daemonStream.open(); // This will start the deamonStream
daemons.put(daemonStream.getId(), daemonStream);
rsp.add("result-set", new DaemonResponseStream("Deamon:"+daemonStream.getId()+" started on "+coreName));
rsp.add("result-set", new DaemonResponseStream("Deamon:" + daemonStream.getId() + " started on " + coreName));
} else {
rsp.add("result-set", new TimerStream(new ExceptionStream(tupleStream)));
}
@ -356,10 +357,10 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware,
private void handleAdmin(SolrQueryRequest req, SolrQueryResponse rsp, SolrParams params) {
String action = params.get("action");
if("stop".equalsIgnoreCase(action)) {
if ("stop".equalsIgnoreCase(action)) {
String id = params.get(ID);
DaemonStream d = daemons.get(id);
if(d != null) {
if (d != null) {
d.close();
rsp.add("result-set", new DaemonResponseStream("Deamon:" + id + " stopped on " + coreName));
} else {
@ -400,50 +401,46 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware,
return null;
}
public static class DummyErrorStream extends TupleStream {
private Exception e;
public DummyErrorStream(Exception e) {
this.e = e;
}
public StreamComparator getStreamSort() {
return null;
}
public void close() {
}
public void close() {}
public void open() {
}
public void open() {}
public void setStreamContext(StreamContext context) {
}
public void setStreamContext(StreamContext context) {}
public List<TupleStream> children() {
return null;
}
@Override
public Explanation toExplanation(StreamFactory factory) throws IOException {
return new StreamExplanation(getStreamNodeId().toString())
.withFunctionName("error")
.withImplementingClass(this.getClass().getName())
.withExpressionType(ExpressionType.STREAM_DECORATOR)
.withExpression("--non-expressible--");
.withFunctionName("error")
.withImplementingClass(this.getClass().getName())
.withExpressionType(ExpressionType.STREAM_DECORATOR)
.withExpression("--non-expressible--");
}
public Tuple read() {
String msg = e.getMessage();
Throwable t = e.getCause();
while(t != null) {
while (t != null) {
msg = t.getMessage();
t = t.getCause();
}
Map m = new HashMap();
m.put("EOF", true);
m.put("EXCEPTION", msg);
@ -457,18 +454,16 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware,
public DaemonCollectionStream(Collection<DaemonStream> col) {
this.it = col.iterator();
}
public StreamComparator getStreamSort() {
return null;
}
public void close() {
}
public void close() {}
public void open() {
}
public void open() {}
public void setStreamContext(StreamContext context) {
}
public void setStreamContext(StreamContext context) {}
public List<TupleStream> children() {
return null;
@ -478,14 +473,14 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware,
public Explanation toExplanation(StreamFactory factory) throws IOException {
return new StreamExplanation(getStreamNodeId().toString())
.withFunctionName("daemon-collection")
.withImplementingClass(this.getClass().getName())
.withExpressionType(ExpressionType.STREAM_DECORATOR)
.withExpression("--non-expressible--");
.withFunctionName("daemon-collection")
.withImplementingClass(this.getClass().getName())
.withExpressionType(ExpressionType.STREAM_DECORATOR)
.withExpression("--non-expressible--");
}
public Tuple read() {
if(it.hasNext()) {
if (it.hasNext()) {
return it.next().getInfo();
} else {
Map m = new HashMap();
@ -502,18 +497,16 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware,
public DaemonResponseStream(String message) {
this.message = message;
}
public StreamComparator getStreamSort() {
return null;
}
public void close() {
}
public void close() {}
public void open() {
}
public void open() {}
public void setStreamContext(StreamContext context) {
}
public void setStreamContext(StreamContext context) {}
public List<TupleStream> children() {
return null;
@ -523,10 +516,10 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware,
public Explanation toExplanation(StreamFactory factory) throws IOException {
return new StreamExplanation(getStreamNodeId().toString())
.withFunctionName("daemon-response")
.withImplementingClass(this.getClass().getName())
.withExpressionType(ExpressionType.STREAM_DECORATOR)
.withExpression("--non-expressible--");
.withFunctionName("daemon-response")
.withImplementingClass(this.getClass().getName())
.withExpressionType(ExpressionType.STREAM_DECORATOR)
.withExpression("--non-expressible--");
}
public Tuple read() {
@ -537,7 +530,7 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware,
} else {
sendEOF = true;
Map m = new HashMap();
m.put("DaemonOp",message);
m.put("DaemonOp", message);
return new Tuple(m);
}
}
@ -577,15 +570,15 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware,
public Explanation toExplanation(StreamFactory factory) throws IOException {
return new StreamExplanation(getStreamNodeId().toString())
.withFunctionName("timer")
.withImplementingClass(this.getClass().getName())
.withExpressionType(ExpressionType.STREAM_DECORATOR)
.withExpression("--non-expressible--");
.withFunctionName("timer")
.withImplementingClass(this.getClass().getName())
.withExpressionType(ExpressionType.STREAM_DECORATOR)
.withExpression("--non-expressible--");
}
public Tuple read() throws IOException {
Tuple tuple = this.tupleStream.read();
if(tuple.EOF) {
if (tuple.EOF) {
long totalTime = (System.nanoTime() - begin) / 1000000;
tuple.fields.put("RESPONSE_TIME", totalTime);
}
@ -593,25 +586,25 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware,
}
}
private Map<String, List<String>> getCollectionShards(SolrParams params) {
private Map<String,List<String>> getCollectionShards(SolrParams params) {
Map<String, List<String>> collectionShards = new HashMap();
Map<String,List<String>> collectionShards = new HashMap();
Iterator<String> paramsIt = params.getParameterNamesIterator();
while(paramsIt.hasNext()) {
while (paramsIt.hasNext()) {
String param = paramsIt.next();
if(param.indexOf(".shards") > -1) {
if (param.indexOf(".shards") > -1) {
String collection = param.split("\\.")[0];
String shardString = params.get(param);
String[] shards = shardString.split(",");
List<String> shardList = new ArrayList();
for(String shard : shards) {
for (String shard : shards) {
shardList.add(shard);
}
collectionShards.put(collection, shardList);
}
}
if(collectionShards.size() > 0) {
if (collectionShards.size() > 0) {
return collectionShards;
} else {
return null;

View File

@ -18,30 +18,72 @@ package org.apache.solr.client.solrj.io.eval;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Locale;
import java.util.stream.Collectors;
import org.apache.solr.client.solrj.io.Tuple;
import org.apache.solr.client.solrj.io.stream.expr.Explanation;
import org.apache.solr.client.solrj.io.stream.expr.Explanation.ExpressionType;
import org.apache.solr.client.solrj.io.stream.expr.Expressible;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParameter;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParameter;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionValue;
import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
import com.google.common.collect.Lists;
public class ArrayEvaluator extends ComplexEvaluator implements Expressible {
private static final long serialVersionUID = 1;
private String sortOrder;
public ArrayEvaluator(StreamExpression expression, StreamFactory factory) throws IOException {
super(expression, factory);
super(expression, factory, Lists.newArrayList("sort"));
sortOrder = extractSortOrder(expression, factory);
}
private String extractSortOrder(StreamExpression expression, StreamFactory factory) throws IOException{
StreamExpressionNamedParameter sortParam = factory.getNamedOperand(expression, "sort");
if(null == sortParam){
return null; // this is ok
}
if(sortParam.getParameter() instanceof StreamExpressionValue){
String sortOrder = ((StreamExpressionValue)sortParam.getParameter()).getValue().trim().toLowerCase(Locale.ROOT);
if("asc".equals(sortOrder) || "desc".equals(sortOrder)){
return sortOrder;
}
}
throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - invalid 'sort' parameter - expecting either 'asc' or 'desc'", expression));
}
public List<Object> evaluate(Tuple tuple) throws IOException {
List<Object> list = new ArrayList<>();
for(StreamEvaluator subEvaluator : subEvaluators) {
Object value = (Number)subEvaluator.evaluate(tuple);
Object value = subEvaluator.evaluate(tuple);
// if we want sorting but the evaluated value is not comparable then we have an error
if(null != sortOrder && !(value instanceof Comparable<?>)){
String message = String.format(Locale.ROOT,"Failed to evaluate to a comparable object - evaluator '%s' resulted in type '%s' and value '%s'",
subEvaluator.toExpression(constructingFactory),
value.getClass().getName(),
value.toString());
throw new IOException(message);
}
list.add(value);
}
if(null != sortOrder){
// Because of the type checking above we know that the value is at least Comparable
Comparator<Comparable> comparator = "asc".equals(sortOrder) ? (left,right) -> left.compareTo(right) : (left,right) -> right.compareTo(left);
list = list.stream().map(value -> (Comparable<Object>)value).sorted(comparator).collect(Collectors.toList());
}
return list;
}

View File

@ -1,77 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.client.solrj.io.eval;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import org.apache.solr.client.solrj.io.Tuple;
import org.apache.solr.client.solrj.io.stream.expr.Explanation;
import org.apache.solr.client.solrj.io.stream.expr.Explanation.ExpressionType;
import org.apache.solr.client.solrj.io.stream.expr.Expressible;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParameter;
import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
public class ArraySortEvaluator extends ComplexEvaluator implements Expressible {
private static final long serialVersionUID = 1;
public ArraySortEvaluator(StreamExpression expression, StreamFactory factory) throws IOException {
super(expression, factory);
}
public List<Number> evaluate(Tuple tuple) throws IOException {
if(subEvaluators.size() != 1) {
throw new IOException("Array sort evaluator expects 1 parameters found: "+subEvaluators.size());
}
StreamEvaluator colEval1 = subEvaluators.get(0);
List<Number> numbers1 = (List<Number>)colEval1.evaluate(tuple);
List<Number> numbers2 = new ArrayList();
numbers2.addAll(numbers1);
Collections.sort(numbers2, new Comparator<Number>() {
@Override
public int compare(Number o1, Number o2) {
Double d1 = o1.doubleValue();
Double d2 = o2.doubleValue();
return d1.compareTo(d2);
}
});
return numbers2;
}
@Override
public StreamExpressionParameter toExpression(StreamFactory factory) throws IOException {
StreamExpression expression = new StreamExpression(factory.getFunctionName(getClass()));
return expression;
}
@Override
public Explanation toExplanation(StreamFactory factory) throws IOException {
return new Explanation(nodeId.toString())
.withExpressionType(ExpressionType.EVALUATOR)
.withFunctionName(factory.getFunctionName(getClass()))
.withImplementingClass(getClass().getName())
.withExpression(toExpression(factory).toString());
}
}

View File

@ -20,7 +20,9 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
import org.apache.solr.client.solrj.io.stream.StreamContext;
import org.apache.solr.client.solrj.io.stream.expr.Explanation;
@ -40,6 +42,10 @@ public abstract class ComplexEvaluator implements StreamEvaluator {
protected List<StreamEvaluator> subEvaluators = new ArrayList<StreamEvaluator>();
public ComplexEvaluator(StreamExpression expression, StreamFactory factory) throws IOException{
this(expression, factory, new ArrayList<>());
}
public ComplexEvaluator(StreamExpression expression, StreamFactory factory, List<String> ignoredNamedParameters) throws IOException{
constructingFactory = factory;
// We have to do this because order of the parameters matter
@ -75,8 +81,16 @@ public abstract class ComplexEvaluator implements StreamEvaluator {
}
}
if(expression.getParameters().size() != subEvaluators.size()){
throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - unknown operands found - expecting only StreamEvaluators or field names", expression));
Set<String> namedParameters = factory.getNamedOperands(expression).stream().map(param -> param.getName()).collect(Collectors.toSet());
long ignorableCount = ignoredNamedParameters.stream().filter(name -> namedParameters.contains(name)).count();
if(0 != expression.getParameters().size() - subEvaluators.size() - ignorableCount){
if(namedParameters.isEmpty()){
throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - unknown operands found - expecting only StreamEvaluators or field names", expression));
}
else{
throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - unknown operands found - expecting only StreamEvaluators, field names, or named parameters [%s]", expression, namedParameters.stream().collect(Collectors.joining(","))));
}
}
}

View File

@ -0,0 +1,155 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.client.solrj.io.stream.eval;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.lucene.util.LuceneTestCase;
import org.apache.solr.client.solrj.io.Tuple;
import org.apache.solr.client.solrj.io.eval.ArrayEvaluator;
import org.apache.solr.client.solrj.io.eval.StreamEvaluator;
import org.apache.solr.client.solrj.io.stream.StreamContext;
import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
import org.junit.Test;
import junit.framework.Assert;
public class ArrayEvaluatorTest extends LuceneTestCase {
StreamFactory factory;
Map<String, Object> values;
public ArrayEvaluatorTest() {
super();
factory = new StreamFactory()
.withFunctionName("array", ArrayEvaluator.class);
values = new HashMap<String,Object>();
}
@Test
public void arrayLongSortAscTest() throws IOException{
StreamEvaluator evaluator = factory.constructEvaluator("array(a,b,c, sort=asc)");
StreamContext context = new StreamContext();
evaluator.setStreamContext(context);
Object result;
values.put("a", 1L);
values.put("b", 3L);
values.put("c", 2L);
result = evaluator.evaluate(new Tuple(values));
Assert.assertTrue(result instanceof List<?>);
Assert.assertEquals(3, ((List<?>)result).size());
Assert.assertEquals(1L, ((List<?>)result).get(0));
Assert.assertEquals(2L, ((List<?>)result).get(1));
Assert.assertEquals(3L, ((List<?>)result).get(2));
}
@Test
public void arrayLongSortDescTest() throws IOException{
StreamEvaluator evaluator = factory.constructEvaluator("array(a,b,c, sort=desc)");
StreamContext context = new StreamContext();
evaluator.setStreamContext(context);
Object result;
values.put("a", 1L);
values.put("b", 3L);
values.put("c", 2L);
result = evaluator.evaluate(new Tuple(values));
Assert.assertTrue(result instanceof List<?>);
Assert.assertEquals(3, ((List<?>)result).size());
Assert.assertEquals(3L, ((List<?>)result).get(0));
Assert.assertEquals(2L, ((List<?>)result).get(1));
Assert.assertEquals(1L, ((List<?>)result).get(2));
}
@Test
public void arrayStringSortAscTest() throws IOException{
StreamEvaluator evaluator = factory.constructEvaluator("array(a,b,c, sort=asc)");
StreamContext context = new StreamContext();
evaluator.setStreamContext(context);
Object result;
values.put("a", "a");
values.put("b", "c");
values.put("c", "b");
result = evaluator.evaluate(new Tuple(values));
Assert.assertTrue(result instanceof List<?>);
Assert.assertEquals(3, ((List<?>)result).size());
Assert.assertEquals("a", ((List<?>)result).get(0));
Assert.assertEquals("b", ((List<?>)result).get(1));
Assert.assertEquals("c", ((List<?>)result).get(2));
}
@Test
public void arrayStringSortDescTest() throws IOException{
StreamEvaluator evaluator = factory.constructEvaluator("array(a,b,c, sort=desc)");
StreamContext context = new StreamContext();
evaluator.setStreamContext(context);
Object result;
values.put("a", "a");
values.put("b", "c");
values.put("c", "b");
result = evaluator.evaluate(new Tuple(values));
Assert.assertTrue(result instanceof List<?>);
Assert.assertEquals(3, ((List<?>)result).size());
Assert.assertEquals("c", ((List<?>)result).get(0));
Assert.assertEquals("b", ((List<?>)result).get(1));
Assert.assertEquals("a", ((List<?>)result).get(2));
}
@Test
public void arrayStringUnsortedTest() throws IOException{
StreamEvaluator evaluator = factory.constructEvaluator("array(a,b,c)");
StreamContext context = new StreamContext();
evaluator.setStreamContext(context);
Object result;
values.put("a", "a");
values.put("b", "c");
values.put("c", "b");
result = evaluator.evaluate(new Tuple(values));
Assert.assertTrue(result instanceof List<?>);
Assert.assertEquals(3, ((List<?>)result).size());
Assert.assertEquals("a", ((List<?>)result).get(0));
Assert.assertEquals("c", ((List<?>)result).get(1));
Assert.assertEquals("b", ((List<?>)result).get(2));
}
}