mirror of https://github.com/apache/lucene.git
SOLR-10303: Add the tuple context to avoid creating multiple LocalDateTime instances for the same Tuple
This commit is contained in:
parent
b78a270c9d
commit
5e403647de
|
@ -185,14 +185,14 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware,
|
|||
.withFunctionName(SORT, SortStream.class)
|
||||
.withFunctionName("train", TextLogitStream.class)
|
||||
.withFunctionName("features", FeaturesSelectionStream.class)
|
||||
.withFunctionName("daemon", DaemonStream.class)
|
||||
.withFunctionName("daemon", DaemonStream.class)
|
||||
.withFunctionName("shortestPath", ShortestPathStream.class)
|
||||
.withFunctionName("gatherNodes", GatherNodesStream.class)
|
||||
.withFunctionName("nodes", GatherNodesStream.class)
|
||||
.withFunctionName("select", SelectStream.class)
|
||||
.withFunctionName("shortestPath", ShortestPathStream.class)
|
||||
.withFunctionName("gatherNodes", GatherNodesStream.class)
|
||||
.withFunctionName("nodes", GatherNodesStream.class)
|
||||
.withFunctionName("nodes", GatherNodesStream.class)
|
||||
.withFunctionName("scoreNodes", ScoreNodesStream.class)
|
||||
.withFunctionName("model", ModelStream.class)
|
||||
.withFunctionName("classify", ClassifyStream.class)
|
||||
|
@ -202,29 +202,29 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware,
|
|||
.withFunctionName("priority", PriorityStream.class)
|
||||
.withFunctionName("significantTerms", SignificantTermsStream.class)
|
||||
.withFunctionName("cartesianProduct", CartesianProductStream.class)
|
||||
.withFunctionName("shuffle", ShuffleStream.class)
|
||||
|
||||
// metrics
|
||||
.withFunctionName("min", MinMetric.class)
|
||||
.withFunctionName("shuffle", ShuffleStream.class)
|
||||
|
||||
// metrics
|
||||
.withFunctionName("min", MinMetric.class)
|
||||
.withFunctionName("max", MaxMetric.class)
|
||||
.withFunctionName("avg", MeanMetric.class)
|
||||
.withFunctionName("sum", SumMetric.class)
|
||||
.withFunctionName("count", CountMetric.class)
|
||||
|
||||
// tuple manipulation operations
|
||||
.withFunctionName("replace", ReplaceOperation.class)
|
||||
.withFunctionName("replace", ReplaceOperation.class)
|
||||
.withFunctionName("concat", ConcatOperation.class)
|
||||
|
||||
// stream reduction operations
|
||||
.withFunctionName("group", GroupOperation.class)
|
||||
.withFunctionName("group", GroupOperation.class)
|
||||
.withFunctionName("distinct", DistinctOperation.class)
|
||||
.withFunctionName("having", HavingStream.class)
|
||||
|
||||
// Stream Evaluators
|
||||
.withFunctionName("val", RawValueEvaluator.class)
|
||||
.withFunctionName("val", RawValueEvaluator.class)
|
||||
|
||||
// Boolean Stream Evaluators
|
||||
.withFunctionName("and", AndEvaluator.class)
|
||||
.withFunctionName("and", AndEvaluator.class)
|
||||
.withFunctionName("eor", ExclusiveOrEvaluator.class)
|
||||
.withFunctionName("eq", EqualsEvaluator.class)
|
||||
.withFunctionName("gt", GreaterThanEvaluator.class)
|
||||
|
@ -232,23 +232,23 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware,
|
|||
.withFunctionName("lt", LessThanEvaluator.class)
|
||||
.withFunctionName("lteq", LessThanEqualToEvaluator.class)
|
||||
.withFunctionName("not", NotEvaluator.class)
|
||||
.withFunctionName("or", OrEvaluator.class)
|
||||
.withFunctionName("or", OrEvaluator.class)
|
||||
|
||||
// Date Time Evaluators
|
||||
.withFunctionName(TemporalEvaluatorYear.FUNCTION_NAME, TemporalEvaluatorYear.class)
|
||||
.withFunctionName(TemporalEvaluatorYear.FUNCTION_NAME, TemporalEvaluatorYear.class)
|
||||
.withFunctionName(TemporalEvaluatorMonth.FUNCTION_NAME, TemporalEvaluatorMonth.class)
|
||||
.withFunctionName(TemporalEvaluatorDay.FUNCTION_NAME, TemporalEvaluatorDay.class)
|
||||
.withFunctionName(TemporalEvaluatorDayOfYear.FUNCTION_NAME, TemporalEvaluatorDayOfYear.class)
|
||||
.withFunctionName(TemporalEvaluatorHour.FUNCTION_NAME, TemporalEvaluatorHour.class)
|
||||
.withFunctionName(TemporalEvaluatorHour.FUNCTION_NAME, TemporalEvaluatorHour.class)
|
||||
.withFunctionName(TemporalEvaluatorMinute.FUNCTION_NAME, TemporalEvaluatorMinute.class)
|
||||
.withFunctionName(TemporalEvaluatorSecond.FUNCTION_NAME, TemporalEvaluatorSecond.class)
|
||||
.withFunctionName(TemporalEvaluatorSecond.FUNCTION_NAME, TemporalEvaluatorSecond.class)
|
||||
.withFunctionName(TemporalEvaluatorEpoch.FUNCTION_NAME, TemporalEvaluatorEpoch.class)
|
||||
.withFunctionName(TemporalEvaluatorWeek.FUNCTION_NAME, TemporalEvaluatorWeek.class)
|
||||
.withFunctionName(TemporalEvaluatorQuarter.FUNCTION_NAME, TemporalEvaluatorQuarter.class)
|
||||
.withFunctionName(TemporalEvaluatorDayOfQuarter.FUNCTION_NAME, TemporalEvaluatorDayOfQuarter.class)
|
||||
.withFunctionName(TemporalEvaluatorQuarter.FUNCTION_NAME, TemporalEvaluatorQuarter.class)
|
||||
.withFunctionName(TemporalEvaluatorDayOfQuarter.FUNCTION_NAME, TemporalEvaluatorDayOfQuarter.class)
|
||||
|
||||
// Number Stream Evaluators
|
||||
.withFunctionName("abs", AbsoluteValueEvaluator.class)
|
||||
.withFunctionName("abs", AbsoluteValueEvaluator.class)
|
||||
.withFunctionName("add", AddEvaluator.class)
|
||||
.withFunctionName("div", DivideEvaluator.class)
|
||||
.withFunctionName("mult", MultiplyEvaluator.class)
|
||||
|
@ -256,7 +256,7 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware,
|
|||
.withFunctionName("log", NaturalLogEvaluator.class)
|
||||
.withFunctionName("pow", PowerEvaluator.class)
|
||||
.withFunctionName("mod", ModuloEvaluator.class)
|
||||
.withFunctionName("ceil", CeilingEvaluator.class)
|
||||
.withFunctionName("ceil", CeilingEvaluator.class)
|
||||
.withFunctionName("floor", FloorEvaluator.class)
|
||||
.withFunctionName("sin", SineEvaluator.class)
|
||||
.withFunctionName("asin", ArcSineEvaluator.class)
|
||||
|
@ -267,7 +267,7 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware,
|
|||
.withFunctionName("tan", TangentEvaluator.class)
|
||||
.withFunctionName("atan", ArcTangentEvaluator.class)
|
||||
.withFunctionName("tanh", HyperbolicTangentEvaluator.class)
|
||||
.withFunctionName("round", RoundEvaluator.class)
|
||||
.withFunctionName("round", RoundEvaluator.class)
|
||||
.withFunctionName("sqrt", SquareRootEvaluator.class)
|
||||
.withFunctionName("cbrt", CubedRootEvaluator.class)
|
||||
.withFunctionName("coalesce", CoalesceEvaluator.class)
|
||||
|
@ -275,8 +275,8 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware,
|
|||
|
||||
|
||||
// Conditional Stream Evaluators
|
||||
.withFunctionName("if", IfThenElseEvaluator.class)
|
||||
.withFunctionName("analyze", AnalyzeEvaluator.class)
|
||||
.withFunctionName("if", IfThenElseEvaluator.class)
|
||||
.withFunctionName("analyze", AnalyzeEvaluator.class)
|
||||
;
|
||||
|
||||
// This pulls all the overrides and additions from the config
|
||||
|
|
|
@ -26,6 +26,7 @@ import java.time.temporal.TemporalAccessor;
|
|||
import java.time.temporal.UnsupportedTemporalTypeException;
|
||||
import java.util.Date;
|
||||
import java.util.Locale;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.solr.client.solrj.io.Tuple;
|
||||
import org.apache.solr.client.solrj.io.stream.expr.Explanation;
|
||||
|
@ -38,6 +39,8 @@ import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
|
|||
*/
|
||||
public abstract class TemporalEvaluator extends ComplexEvaluator {
|
||||
|
||||
private String field;
|
||||
|
||||
public TemporalEvaluator(StreamExpression expression, StreamFactory factory) throws IOException {
|
||||
super(expression, factory);
|
||||
|
||||
|
@ -58,21 +61,32 @@ public abstract class TemporalEvaluator extends ComplexEvaluator {
|
|||
|
||||
if (tupleValue == null) return null;
|
||||
|
||||
if (tupleValue instanceof String) {
|
||||
instant = getInstant((String) tupleValue);
|
||||
} else if (tupleValue instanceof Long) {
|
||||
instant = Instant.ofEpochMilli((Long)tupleValue);
|
||||
} else if (tupleValue instanceof Instant) {
|
||||
instant = (Instant) tupleValue;
|
||||
} else if (tupleValue instanceof Date) {
|
||||
instant = ((Date) tupleValue).toInstant();
|
||||
} else if (tupleValue instanceof TemporalAccessor) {
|
||||
date = ((TemporalAccessor) tupleValue);
|
||||
if(field == null) {
|
||||
field = streamEvaluator.toExpression(constructingFactory).toString();
|
||||
}
|
||||
|
||||
Map tupleContext = streamContext.getTupleContext();
|
||||
date = (LocalDateTime)tupleContext.get(field); // Check to see if the date has already been created for this field
|
||||
|
||||
if(date == null) {
|
||||
if (tupleValue instanceof String) {
|
||||
instant = getInstant((String) tupleValue);
|
||||
} else if (tupleValue instanceof Long) {
|
||||
instant = Instant.ofEpochMilli((Long) tupleValue);
|
||||
} else if (tupleValue instanceof Instant) {
|
||||
instant = (Instant) tupleValue;
|
||||
} else if (tupleValue instanceof Date) {
|
||||
instant = ((Date) tupleValue).toInstant();
|
||||
} else if (tupleValue instanceof TemporalAccessor) {
|
||||
date = ((TemporalAccessor) tupleValue);
|
||||
tupleContext.put(field, date); // Cache the date in the TupleContext
|
||||
}
|
||||
}
|
||||
|
||||
if (instant != null) {
|
||||
if (TemporalEvaluatorEpoch.FUNCTION_NAME.equals(getFunction())) return instant.toEpochMilli();
|
||||
date = LocalDateTime.ofInstant(instant, ZoneOffset.UTC);
|
||||
tupleContext.put(field, date); // Cache the date in the TupleContext
|
||||
}
|
||||
|
||||
if (date != null) {
|
||||
|
|
|
@ -43,6 +43,7 @@ public class HavingStream extends TupleStream implements Expressible {
|
|||
|
||||
private TupleStream stream;
|
||||
private BooleanEvaluator evaluator;
|
||||
private StreamContext streamContext;
|
||||
|
||||
private transient Tuple currentGroupHead;
|
||||
|
||||
|
@ -128,6 +129,7 @@ public class HavingStream extends TupleStream implements Expressible {
|
|||
}
|
||||
|
||||
public void setStreamContext(StreamContext context) {
|
||||
this.streamContext = context;
|
||||
this.stream.setStreamContext(context);
|
||||
}
|
||||
|
||||
|
@ -152,6 +154,7 @@ public class HavingStream extends TupleStream implements Expressible {
|
|||
return tuple;
|
||||
}
|
||||
|
||||
streamContext.getTupleContext().clear();
|
||||
if(evaluator.evaluate(tuple)){
|
||||
return tuple;
|
||||
}
|
||||
|
|
|
@ -49,6 +49,7 @@ public class SelectStream extends TupleStream implements Expressible {
|
|||
private static final long serialVersionUID = 1;
|
||||
|
||||
private TupleStream stream;
|
||||
private StreamContext streamContext;
|
||||
private Map<String,String> selectedFields;
|
||||
private Map<StreamEvaluator,String> selectedEvaluators;
|
||||
private List<StreamOperation> operations;
|
||||
|
@ -213,6 +214,7 @@ public class SelectStream extends TupleStream implements Expressible {
|
|||
}
|
||||
|
||||
public void setStreamContext(StreamContext context) {
|
||||
this.streamContext = context;
|
||||
this.stream.setStreamContext(context);
|
||||
Set<StreamEvaluator> evaluators = selectedEvaluators.keySet();
|
||||
|
||||
|
@ -245,6 +247,14 @@ public class SelectStream extends TupleStream implements Expressible {
|
|||
// create a copy with the limited set of fields
|
||||
Tuple workingToReturn = new Tuple(new HashMap<>());
|
||||
Tuple workingForEvaluators = new Tuple(new HashMap<>());
|
||||
|
||||
//Clear the TupleContext before running the evaluators.
|
||||
//The TupleContext allows evaluators to cache values within the scope of a single tuple.
|
||||
//For example a LocalDateTime could be parsed by one evaluator and used by other evaluators within the scope of the tuple.
|
||||
//This avoids the need to create multiple LocalDateTime instances for the same tuple to satisfy a select expression.
|
||||
|
||||
streamContext.getTupleContext().clear();
|
||||
|
||||
for(Object fieldName : original.fields.keySet()){
|
||||
workingForEvaluators.put(fieldName, original.get(fieldName));
|
||||
if(selectedFields.containsKey(fieldName)){
|
||||
|
|
|
@ -36,6 +36,7 @@ import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
|
|||
public class StreamContext implements Serializable{
|
||||
|
||||
private Map entries = new HashMap();
|
||||
private Map tupleContext = new HashMap();
|
||||
public int workerID;
|
||||
public int numWorkers;
|
||||
private SolrClientCache clientCache;
|
||||
|
@ -78,6 +79,10 @@ public class StreamContext implements Serializable{
|
|||
this.streamFactory = streamFactory;
|
||||
}
|
||||
|
||||
public Map getTupleContext() {
|
||||
return tupleContext;
|
||||
}
|
||||
|
||||
public StreamFactory getStreamFactory() {
|
||||
return this.streamFactory;
|
||||
}
|
||||
|
|
|
@ -43,6 +43,7 @@ import org.apache.solr.client.solrj.io.eval.TemporalEvaluatorQuarter;
|
|||
import org.apache.solr.client.solrj.io.eval.TemporalEvaluatorSecond;
|
||||
import org.apache.solr.client.solrj.io.eval.TemporalEvaluatorWeek;
|
||||
import org.apache.solr.client.solrj.io.eval.TemporalEvaluatorYear;
|
||||
import org.apache.solr.client.solrj.io.stream.StreamContext;
|
||||
import org.apache.solr.client.solrj.io.stream.expr.Explanation;
|
||||
import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
|
||||
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParser;
|
||||
|
@ -50,7 +51,6 @@ import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
|
|||
import org.junit.Test;
|
||||
|
||||
import static junit.framework.Assert.assertEquals;
|
||||
import static junit.framework.Assert.assertNotNull;
|
||||
import static junit.framework.Assert.assertNull;
|
||||
import static junit.framework.Assert.assertTrue;
|
||||
|
||||
|
@ -90,6 +90,8 @@ public class TemporalEvaluatorsTest {
|
|||
|
||||
try {
|
||||
evaluator = factory.constructEvaluator("week()");
|
||||
StreamContext streamContext = new StreamContext();
|
||||
evaluator.setStreamContext(streamContext);
|
||||
assertTrue(false);
|
||||
} catch (IOException e) {
|
||||
assertTrue(e.getCause().getCause().getMessage().contains("Invalid expression week()"));
|
||||
|
@ -97,6 +99,8 @@ public class TemporalEvaluatorsTest {
|
|||
|
||||
try {
|
||||
evaluator = factory.constructEvaluator("week(a, b)");
|
||||
StreamContext streamContext = new StreamContext();
|
||||
evaluator.setStreamContext(streamContext);
|
||||
assertTrue(false);
|
||||
} catch (IOException e) {
|
||||
assertTrue(e.getCause().getCause().getMessage().contains("expecting one value but found 2"));
|
||||
|
@ -104,6 +108,8 @@ public class TemporalEvaluatorsTest {
|
|||
|
||||
try {
|
||||
evaluator = factory.constructEvaluator("Week()");
|
||||
StreamContext streamContext = new StreamContext();
|
||||
evaluator.setStreamContext(streamContext);
|
||||
assertTrue(false);
|
||||
} catch (IOException e) {
|
||||
assertTrue(e.getMessage().contains("Invalid evaluator expression Week() - function 'Week' is unknown"));
|
||||
|
@ -115,9 +121,12 @@ public class TemporalEvaluatorsTest {
|
|||
public void testInvalidValues() throws Exception {
|
||||
StreamEvaluator evaluator = factory.constructEvaluator("year(a)");
|
||||
|
||||
|
||||
try {
|
||||
values.clear();
|
||||
values.put("a", 12);
|
||||
StreamContext streamContext = new StreamContext();
|
||||
evaluator.setStreamContext(streamContext);
|
||||
Object result = evaluator.evaluate(new Tuple(values));
|
||||
assertTrue(false);
|
||||
} catch (IOException e) {
|
||||
|
@ -127,6 +136,8 @@ public class TemporalEvaluatorsTest {
|
|||
try {
|
||||
values.clear();
|
||||
values.put("a", "1995-12-31");
|
||||
StreamContext streamContext = new StreamContext();
|
||||
evaluator.setStreamContext(streamContext);
|
||||
Object result = evaluator.evaluate(new Tuple(values));
|
||||
assertTrue(false);
|
||||
} catch (IOException e) {
|
||||
|
@ -136,6 +147,8 @@ public class TemporalEvaluatorsTest {
|
|||
try {
|
||||
values.clear();
|
||||
values.put("a", "");
|
||||
StreamContext streamContext = new StreamContext();
|
||||
evaluator.setStreamContext(streamContext);
|
||||
Object result = evaluator.evaluate(new Tuple(values));
|
||||
assertTrue(false);
|
||||
} catch (IOException e) {
|
||||
|
@ -267,6 +280,8 @@ public class TemporalEvaluatorsTest {
|
|||
|
||||
public void testFunction(String expression, Object value, Number expected) throws Exception {
|
||||
StreamEvaluator evaluator = factory.constructEvaluator(expression);
|
||||
StreamContext streamContext = new StreamContext();
|
||||
evaluator.setStreamContext(streamContext);
|
||||
values.clear();
|
||||
values.put("a", value);
|
||||
Object result = evaluator.evaluate(new Tuple(values));
|
||||
|
|
Loading…
Reference in New Issue