From 30924f23d6834605b9bf2d24509755ff61c4e878 Mon Sep 17 00:00:00 2001 From: Andrzej Bialecki Date: Mon, 8 Jun 2020 16:03:07 +0200 Subject: [PATCH] SOLR-14470: Add streaming expressions to /export handler. --- solr/CHANGES.txt | 2 + .../org/apache/solr/handler/CatStream.java | 21 +- .../apache/solr/handler/ExportHandler.java | 69 ++++- .../org/apache/solr/handler/GraphHandler.java | 12 +- .../org/apache/solr/handler/SQLHandler.java | 10 +- .../apache/solr/handler/StreamHandler.java | 54 ++-- .../solr/handler/export/ExportWriter.java | 246 +++++++++++++++++- .../handler/export/SingleValueSortDoc.java | 5 + .../apache/solr/handler/export/SortDoc.java | 4 + .../handler/export/StringFieldWriter.java | 16 +- .../solr/handler/export/StringValue.java | 22 +- .../apache/solr/handler/sql/LimitStream.java | 6 +- .../solr/response/GraphMLResponseWriter.java | 4 +- .../org/apache/solr/core/HelloStream.java | 10 +- .../solr/handler/export/TestExportWriter.java | 53 +++- .../src/exporting-result-sets.adoc | 28 ++ .../apache/solr/client/solrj/io/Tuple.java | 154 +++++++++-- .../client/solrj/io/eval/AnovaEvaluator.java | 11 +- .../io/eval/ChiSquareDataSetEvaluator.java | 11 +- .../solrj/io/eval/DescribeEvaluator.java | 30 +-- .../io/eval/FrequencyTableEvaluator.java | 16 +- .../solrj/io/eval/GTestDataSetEvaluator.java | 11 +- .../solrj/io/eval/HistogramEvaluator.java | 24 +- .../io/eval/KolmogorovSmirnovEvaluator.java | 19 +- .../solrj/io/eval/MannWhitneyUEvaluator.java | 11 +- .../solrj/io/eval/OutliersEvaluator.java | 3 +- .../solrj/io/eval/PairedTTestEvaluator.java | 9 +- .../solrj/io/eval/RecursiveEvaluator.java | 15 +- .../solrj/io/eval/SetValueEvaluator.java | 10 +- .../client/solrj/io/eval/TTestEvaluator.java | 11 +- .../solrj/io/graph/GatherNodesStream.java | 13 +- .../solr/client/solrj/io/graph/Node.java | 16 +- .../solrj/io/graph/ShortestPathStream.java | 10 +- .../client/solrj/io/ops/GroupOperation.java | 9 +- .../solrj/io/stream/CalculatorStream.java | 12 +- .../client/solrj/io/stream/CellStream.java | 7 +- .../solrj/io/stream/CloudSolrStream.java | 9 +- .../client/solrj/io/stream/CommitStream.java | 2 +- .../client/solrj/io/stream/CsvStream.java | 3 +- .../client/solrj/io/stream/DaemonStream.java | 12 +- .../solrj/io/stream/DeepRandomStream.java | 9 +- .../client/solrj/io/stream/EchoStream.java | 11 +- .../solrj/io/stream/ExceptionStream.java | 12 +- .../client/solrj/io/stream/Facet2DStream.java | 11 +- .../client/solrj/io/stream/FacetStream.java | 12 +- .../io/stream/FeaturesSelectionStream.java | 20 +- .../client/solrj/io/stream/GetStream.java | 10 +- .../solrj/io/stream/HashRollupStream.java | 8 +- .../client/solrj/io/stream/JDBCStream.java | 23 +- .../client/solrj/io/stream/KnnStream.java | 11 +- .../client/solrj/io/stream/ListStream.java | 5 +- .../client/solrj/io/stream/ModelStream.java | 4 +- .../client/solrj/io/stream/NoOpStream.java | 6 +- .../client/solrj/io/stream/NullStream.java | 3 +- .../solrj/io/stream/ParallelListStream.java | 7 +- .../solrj/io/stream/ParallelStream.java | 8 +- .../client/solrj/io/stream/PlotStream.java | 10 +- .../client/solrj/io/stream/RandomStream.java | 13 +- .../client/solrj/io/stream/RollupStream.java | 16 +- .../solrj/io/stream/ScoreNodesStream.java | 8 +- .../client/solrj/io/stream/SearchStream.java | 13 +- .../client/solrj/io/stream/SelectStream.java | 6 +- .../io/stream/SignificantTermsStream.java | 4 +- .../client/solrj/io/stream/SolrStream.java | 8 +- .../client/solrj/io/stream/StatsStream.java | 8 +- .../solrj/io/stream/TextLogitStream.java | 27 +- .../solrj/io/stream/TimeSeriesStream.java | 9 +- .../client/solrj/io/stream/TupStream.java | 12 +- .../client/solrj/io/stream/UpdateStream.java | 18 +- .../client/solrj/io/stream/ZplotStream.java | 29 +-- .../solrj/io/stream/expr/StreamFactory.java | 234 +++++++++-------- .../solrj/io/stream/metrics/CountMetric.java | 4 +- .../solr/common/params/StreamParams.java | 41 +++ .../solrj/io/stream/CloudAuthStreamTest.java | 4 +- .../solrj/io/stream/JDBCStreamTest.java | 4 +- .../io/stream/SelectWithEvaluatorsTest.java | 4 +- .../solrj/io/stream/StreamDecoratorTest.java | 6 +- .../solrj/io/stream/StreamExpressionTest.java | 4 +- 78 files changed, 999 insertions(+), 633 deletions(-) create mode 100644 solr/solrj/src/java/org/apache/solr/common/params/StreamParams.java diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt index 26994d696bb..19970d42879 100644 --- a/solr/CHANGES.txt +++ b/solr/CHANGES.txt @@ -94,6 +94,8 @@ New Features * SOLR-14476: Add percentiles and standard deviation aggregations to stats, facet and timeseries Streaming Expressions (Joel Bernstein) +* SOLR-14470: Add streaming expressions to /export handler. (ab, Joel Bernstein) + Improvements --------------------- * SOLR-14316: Remove unchecked type conversion warning in JavaBinCodec's readMapEntry's equals() method diff --git a/solr/core/src/java/org/apache/solr/handler/CatStream.java b/solr/core/src/java/org/apache/solr/handler/CatStream.java index 696fb89ad49..806c94a381d 100644 --- a/solr/core/src/java/org/apache/solr/handler/CatStream.java +++ b/solr/core/src/java/org/apache/solr/handler/CatStream.java @@ -24,7 +24,6 @@ import java.nio.file.Files; import java.nio.file.Paths; import java.util.ArrayList; import java.util.Arrays; -import java.util.HashMap; import java.util.Iterator; import java.util.List; @@ -126,14 +125,14 @@ public class CatStream extends TupleStream implements Expressible { public Tuple read() throws IOException { if (maxLines >= 0 && linesReturned >= maxLines) { closeCurrentFileIfSet(); - return createEofTuple(); + return Tuple.EOF(); } else if (currentFileHasMoreLinesToRead()) { return fetchNextLineFromCurrentFile(); } else if (advanceToNextFileWithData()) { return fetchNextLineFromCurrentFile(); } else { // No more data closeCurrentFileIfSet(); - return createEofTuple(); + return Tuple.EOF(); } } @@ -201,18 +200,10 @@ public class CatStream extends TupleStream implements Expressible { private Tuple fetchNextLineFromCurrentFile() { linesReturned++; - @SuppressWarnings({"rawtypes"}) - HashMap m = new HashMap(); - m.put("file", currentFilePath.displayPath); - m.put("line", currentFileLines.next()); - return new Tuple(m); - } - - @SuppressWarnings({"unchecked", "rawtypes"}) - private Tuple createEofTuple() { - HashMap m = new HashMap(); - m.put("EOF", true); - return new Tuple(m); + return new Tuple( + "file", currentFilePath.displayPath, + "line", currentFileLines.next() + ); } private boolean currentFileHasMoreLinesToRead() { diff --git a/solr/core/src/java/org/apache/solr/handler/ExportHandler.java b/solr/core/src/java/org/apache/solr/handler/ExportHandler.java index ea9239d0265..04800a3abe0 100644 --- a/solr/core/src/java/org/apache/solr/handler/ExportHandler.java +++ b/solr/core/src/java/org/apache/solr/handler/ExportHandler.java @@ -18,20 +18,87 @@ package org.apache.solr.handler; +import java.lang.invoke.MethodHandles; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import org.apache.solr.client.solrj.io.ModelCache; +import org.apache.solr.client.solrj.io.SolrClientCache; +import org.apache.solr.client.solrj.io.stream.StreamContext; import org.apache.solr.common.params.CommonParams; import org.apache.solr.common.params.MapSolrParams; import org.apache.solr.common.params.SolrParams; +import org.apache.solr.core.CoreContainer; +import org.apache.solr.core.SolrCore; import org.apache.solr.handler.component.SearchHandler; import org.apache.solr.handler.export.ExportWriter; import org.apache.solr.request.SolrQueryRequest; import org.apache.solr.response.SolrQueryResponse; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import static org.apache.solr.common.params.CommonParams.JSON; public class ExportHandler extends SearchHandler { + private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + private ModelCache modelCache = null; + private ConcurrentMap objectCache = new ConcurrentHashMap(); + private SolrDefaultStreamFactory streamFactory = new ExportHandlerStreamFactory(); + private String coreName; + private SolrClientCache solrClientCache; + private StreamContext initialStreamContext; + + public static class ExportHandlerStreamFactory extends SolrDefaultStreamFactory { + static final String[] forbiddenStreams = new String[] { + // source streams + "search", "facet", "facet2D", "update", "delete", "jdbc", "topic", + "commit", "random", "knnSearch", + // execution streams + "parallel", "executor", "daemon" + // other streams? + }; + + public ExportHandlerStreamFactory() { + super(); + for (String function : forbiddenStreams) { + this.withoutFunctionName(function); + } + this.withFunctionName("input", ExportWriter.ExportWriterStream.class); + } + } + + @Override + public void inform(SolrCore core) { + super.inform(core); + String defaultCollection; + String defaultZkhost; + CoreContainer coreContainer = core.getCoreContainer(); + this.solrClientCache = coreContainer.getSolrClientCache(); + this.coreName = core.getName(); + + if (coreContainer.isZooKeeperAware()) { + defaultCollection = core.getCoreDescriptor().getCollectionName(); + defaultZkhost = core.getCoreContainer().getZkController().getZkServerAddress(); + streamFactory.withCollectionZkHost(defaultCollection, defaultZkhost); + streamFactory.withDefaultZkHost(defaultZkhost); + modelCache = new ModelCache(250, + defaultZkhost, + solrClientCache); + } + streamFactory.withSolrResourceLoader(core.getResourceLoader()); + StreamHandler.addExpressiblePlugins(streamFactory, core); + initialStreamContext = new StreamContext(); + initialStreamContext.setStreamFactory(streamFactory); + initialStreamContext.setSolrClientCache(solrClientCache); + initialStreamContext.setModelCache(modelCache); + initialStreamContext.setObjectCache(objectCache); + initialStreamContext.put("core", this.coreName); + initialStreamContext.put("solr-core", core); + } + @Override public void handleRequestBody(SolrQueryRequest req, SolrQueryResponse rsp) throws Exception { try { @@ -44,6 +111,6 @@ public class ExportHandler extends SearchHandler { Map map = new HashMap<>(1); map.put(CommonParams.WT, ReplicationHandler.FILE_STREAM); req.setParams(SolrParams.wrapDefaults(new MapSolrParams(map),req.getParams())); - rsp.add(ReplicationHandler.FILE_STREAM, new ExportWriter(req, rsp, wt)); + rsp.add(ReplicationHandler.FILE_STREAM, new ExportWriter(req, rsp, wt, initialStreamContext)); } } diff --git a/solr/core/src/java/org/apache/solr/handler/GraphHandler.java b/solr/core/src/java/org/apache/solr/handler/GraphHandler.java index 0a4bd82a08f..5c159e70d97 100644 --- a/solr/core/src/java/org/apache/solr/handler/GraphHandler.java +++ b/solr/core/src/java/org/apache/solr/handler/GraphHandler.java @@ -20,7 +20,6 @@ package org.apache.solr.handler; import java.io.IOException; import java.lang.invoke.MethodHandles; import java.util.Collections; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -40,6 +39,7 @@ import org.apache.solr.common.SolrException; import org.apache.solr.common.params.CommonParams; import org.apache.solr.common.params.ModifiableSolrParams; import org.apache.solr.common.params.SolrParams; +import org.apache.solr.common.params.StreamParams; import org.apache.solr.common.util.NamedList; import org.apache.solr.core.CoreContainer; import org.apache.solr.core.PluginInfo; @@ -206,14 +206,8 @@ public class GraphHandler extends RequestHandlerBase implements SolrCoreAware, P return null; } - @SuppressWarnings({"unchecked"}) public Tuple read() { - String msg = e.getMessage(); - @SuppressWarnings({"rawtypes"}) - Map m = new HashMap(); - m.put("EOF", true); - m.put("EXCEPTION", msg); - return new Tuple(m); + return Tuple.EXCEPTION(e.getMessage(), true); } } @@ -265,7 +259,7 @@ public class GraphHandler extends RequestHandlerBase implements SolrCoreAware, P Tuple tuple = this.tupleStream.read(); if(tuple.EOF) { long totalTime = (System.nanoTime() - begin) / 1000000; - tuple.fields.put("RESPONSE_TIME", totalTime); + tuple.put(StreamParams.RESPONSE_TIME, totalTime); } return tuple; } diff --git a/solr/core/src/java/org/apache/solr/handler/SQLHandler.java b/solr/core/src/java/org/apache/solr/handler/SQLHandler.java index 6b0330add83..8bc1491f78c 100644 --- a/solr/core/src/java/org/apache/solr/handler/SQLHandler.java +++ b/solr/core/src/java/org/apache/solr/handler/SQLHandler.java @@ -159,7 +159,7 @@ public class SQLHandler extends RequestHandlerBase implements SolrCoreAware, Per // Return a metadata tuple as the first tuple and then pass through to the JDBCStream. if(firstTuple) { try { - Map fields = new HashMap<>(); + Tuple tuple = new Tuple(); firstTuple = false; @@ -173,10 +173,10 @@ public class SQLHandler extends RequestHandlerBase implements SolrCoreAware, Per } if(includeMetadata) { - fields.put("isMetadata", true); - fields.put("fields", metadataFields); - fields.put("aliases", metadataAliases); - return new Tuple(fields); + tuple.put("isMetadata", true); + tuple.put("fields", metadataFields); + tuple.put("aliases", metadataAliases); + return tuple; } } catch (SQLException e) { throw new IOException(e); diff --git a/solr/core/src/java/org/apache/solr/handler/StreamHandler.java b/solr/core/src/java/org/apache/solr/handler/StreamHandler.java index 75940ebe7ea..f1b15445dc2 100644 --- a/solr/core/src/java/org/apache/solr/handler/StreamHandler.java +++ b/solr/core/src/java/org/apache/solr/handler/StreamHandler.java @@ -53,6 +53,7 @@ import org.apache.solr.common.cloud.ZkStateReader; import org.apache.solr.common.params.CommonParams; import org.apache.solr.common.params.ModifiableSolrParams; import org.apache.solr.common.params.SolrParams; +import org.apache.solr.common.params.StreamParams; import org.apache.solr.core.CoreContainer; import org.apache.solr.core.PluginInfo; import org.apache.solr.core.SolrConfig; @@ -176,10 +177,10 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware, TupleStream tupleStream; try { - StreamExpression streamExpression = StreamExpressionParser.parse(params.get("expr")); + StreamExpression streamExpression = StreamExpressionParser.parse(params.get(StreamParams.EXPR)); if (this.streamFactory.isEvaluator(streamExpression)) { - StreamExpression tupleExpression = new StreamExpression("tuple"); - tupleExpression.addParameter(new StreamExpressionNamedParameter("return-value", streamExpression)); + StreamExpression tupleExpression = new StreamExpression(StreamParams.TUPLE); + tupleExpression.addParameter(new StreamExpressionNamedParameter(StreamParams.RETURN_VALUE, streamExpression)); tupleStream = this.streamFactory.constructStream(tupleExpression); } else { tupleStream = this.streamFactory.constructStream(streamExpression); @@ -188,7 +189,7 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware, // Catch exceptions that occur while the stream is being created. This will include streaming expression parse // rules. SolrException.log(log, e); - rsp.add("result-set", new DummyErrorStream(e)); + rsp.add(StreamParams.RESULT_SET, new DummyErrorStream(e)); return; } @@ -241,9 +242,9 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware, daemonStream.setDaemons(daemons); daemonStream.open(); // This will start the daemonStream daemons.put(daemonStream.getId(), daemonStream); - rsp.add("result-set", new DaemonResponseStream("Daemon:" + daemonStream.getId() + " started on " + coreName)); + rsp.add(StreamParams.RESULT_SET, new DaemonResponseStream("Daemon:" + daemonStream.getId() + " started on " + coreName)); } else { - rsp.add("result-set", new TimerStream(new ExceptionStream(tupleStream))); + rsp.add(StreamParams.RESULT_SET, new TimerStream(new ExceptionStream(tupleStream))); } } @@ -256,40 +257,40 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware, if ("list".equals(action)) { Collection vals = daemons.values(); - rsp.add("result-set", new DaemonCollectionStream(vals)); + rsp.add(StreamParams.RESULT_SET, new DaemonCollectionStream(vals)); return; } String id = params.get(ID); DaemonStream d = daemons.get(id); if (d == null) { - rsp.add("result-set", new DaemonResponseStream("Daemon:" + id + " not found on " + coreName)); + rsp.add(StreamParams.RESULT_SET, new DaemonResponseStream("Daemon:" + id + " not found on " + coreName)); return; } switch (action) { case "stop": d.close(); - rsp.add("result-set", new DaemonResponseStream("Daemon:" + id + " stopped on " + coreName)); + rsp.add(StreamParams.RESULT_SET, new DaemonResponseStream("Daemon:" + id + " stopped on " + coreName)); break; case "start": try { d.open(); } catch (IOException e) { - rsp.add("result-set", new DaemonResponseStream("Daemon: " + id + " error: " + e.getMessage())); + rsp.add(StreamParams.RESULT_SET, new DaemonResponseStream("Daemon: " + id + " error: " + e.getMessage())); } - rsp.add("result-set", new DaemonResponseStream("Daemon:" + id + " started on " + coreName)); + rsp.add(StreamParams.RESULT_SET, new DaemonResponseStream("Daemon:" + id + " started on " + coreName)); break; case "kill": daemons.remove(id); d.close(); // we already found it in the daemons list, so we don't need to verify we removed it. - rsp.add("result-set", new DaemonResponseStream("Daemon:" + id + " killed on " + coreName)); + rsp.add(StreamParams.RESULT_SET, new DaemonResponseStream("Daemon:" + id + " killed on " + coreName)); break; default: - rsp.add("result-set", new DaemonResponseStream("Daemon:" + id + " action '" + rsp.add(StreamParams.RESULT_SET, new DaemonResponseStream("Daemon:" + id + " action '" + action + "' not recognized on " + coreName)); break; } @@ -344,7 +345,6 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware, .withExpression("--non-expressible--"); } - @SuppressWarnings({"unchecked"}) public Tuple read() { String msg = e.getMessage(); @@ -353,12 +353,7 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware, msg = t.getMessage(); t = t.getCause(); } - - @SuppressWarnings({"rawtypes"}) - Map m = new HashMap(); - m.put("EOF", true); - m.put("EXCEPTION", msg); - return new Tuple(m); + return Tuple.EXCEPTION(msg, true); } } @@ -396,15 +391,11 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware, .withExpression("--non-expressible--"); } - @SuppressWarnings({"unchecked"}) public Tuple read() { if (it.hasNext()) { return it.next().getInfo(); } else { - @SuppressWarnings({"rawtypes"}) - Map m = new HashMap(); - m.put("EOF", true); - return new Tuple(m); + return Tuple.EOF(); } } } @@ -444,19 +435,12 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware, .withExpression("--non-expressible--"); } - @SuppressWarnings({"unchecked"}) public Tuple read() { if (sendEOF) { - @SuppressWarnings({"rawtypes"}) - Map m = new HashMap(); - m.put("EOF", true); - return new Tuple(m); + return Tuple.EOF(); } else { sendEOF = true; - @SuppressWarnings({"rawtypes"}) - Map m = new HashMap(); - m.put("DaemonOp", message); - return new Tuple(m); + return new Tuple("DaemonOp", message); } } } @@ -506,7 +490,7 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware, Tuple tuple = this.tupleStream.read(); if (tuple.EOF) { long totalTime = (System.nanoTime() - begin) / 1000000; - tuple.fields.put("RESPONSE_TIME", totalTime); + tuple.put(StreamParams.RESPONSE_TIME, totalTime); } return tuple; } diff --git a/solr/core/src/java/org/apache/solr/handler/export/ExportWriter.java b/solr/core/src/java/org/apache/solr/handler/export/ExportWriter.java index adacd776bb6..3eccd0d9cd5 100644 --- a/solr/core/src/java/org/apache/solr/handler/export/ExportWriter.java +++ b/solr/core/src/java/org/apache/solr/handler/export/ExportWriter.java @@ -35,12 +35,29 @@ import org.apache.lucene.search.SortField; import org.apache.lucene.util.BitSetIterator; import org.apache.lucene.util.FixedBitSet; import org.apache.solr.client.solrj.impl.BinaryResponseParser; +import org.apache.solr.client.solrj.io.Tuple; +import org.apache.solr.client.solrj.io.comp.ComparatorOrder; +import org.apache.solr.client.solrj.io.comp.FieldComparator; +import org.apache.solr.client.solrj.io.comp.MultipleFieldComparator; +import org.apache.solr.client.solrj.io.comp.StreamComparator; +import org.apache.solr.client.solrj.io.stream.StreamContext; +import org.apache.solr.client.solrj.io.stream.TupleStream; +import org.apache.solr.client.solrj.io.stream.expr.Explanation; +import org.apache.solr.client.solrj.io.stream.expr.Expressible; +import org.apache.solr.client.solrj.io.stream.expr.StreamExplanation; +import org.apache.solr.client.solrj.io.stream.expr.StreamExpression; +import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParameter; +import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParameter; +import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParser; +import org.apache.solr.client.solrj.io.stream.expr.StreamFactory; import org.apache.solr.common.IteratorWriter; import org.apache.solr.common.MapWriter; import org.apache.solr.common.MapWriter.EntryWriter; import org.apache.solr.common.PushWriter; import org.apache.solr.common.SolrException; +import org.apache.solr.common.params.CommonParams; import org.apache.solr.common.params.SolrParams; +import org.apache.solr.common.params.StreamParams; import org.apache.solr.common.util.JavaBinCodec; import org.apache.solr.core.SolrCore; import org.apache.solr.request.SolrQueryRequest; @@ -84,23 +101,164 @@ import static org.apache.solr.common.util.Utils.makeMap; * once), and it allows {@link ExportWriter} to scale well with regard to numDocs. */ public class ExportWriter implements SolrCore.RawWriter, Closeable { - private static final int DOCUMENT_BATCH_SIZE = 30000; private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + private static final int DOCUMENT_BATCH_SIZE = 30000; + + private static final String EXPORT_WRITER_KEY = "__ew__"; + private static final String SORT_DOCS_KEY = "_ew_docs_"; + private static final String TOTAL_HITS_KEY = "_ew_totalHits_"; + private static final String LEAF_READERS_KEY = "_ew_leaves_"; + private static final String SORT_QUEUE_KEY = "_ew_queue_"; + private static final String SORT_DOC_KEY = "_ew_sort_"; + private OutputStreamWriter respWriter; final SolrQueryRequest req; final SolrQueryResponse res; + final StreamContext initialStreamContext; + StreamExpression streamExpression; + StreamContext streamContext; FieldWriter[] fieldWriters; int totalHits = 0; FixedBitSet[] sets = null; PushWriter writer; private String wt; + private static class TupleEntryWriter implements EntryWriter { + Tuple tuple; - public ExportWriter(SolrQueryRequest req, SolrQueryResponse res, String wt) { + void setTuple(Tuple tuple) { + this.tuple = tuple; + } + + @Override + public EntryWriter put(CharSequence k, Object v) throws IOException { + tuple.put(k, v); + return this; + } + } + + public static class ExportWriterStream extends TupleStream implements Expressible { + StreamContext context; + StreamComparator streamComparator; + int pos = -1; + int outDocIndex = -1; + int count; + SortDoc sortDoc; + SortQueue queue; + SortDoc[] docs; + int totalHits; + ExportWriter exportWriter; + List leaves; + final TupleEntryWriter entryWriter = new TupleEntryWriter(); + + public ExportWriterStream(StreamExpression expression, StreamFactory factory) throws IOException { + streamComparator = parseComp(factory.getDefaultSort()); + } + + @Override + public void setStreamContext(StreamContext context) { + this.context = context; + } + + @Override + public List children() { + return null; + } + + private StreamComparator parseComp(String sort) throws IOException { + + String[] sorts = sort.split(","); + StreamComparator[] comps = new StreamComparator[sorts.length]; + for(int i=0; i 1) { + return new MultipleFieldComparator(comps); + } else { + return comps[0]; + } + } + + @Override + public void open() throws IOException { + docs = (SortDoc[]) context.get(SORT_DOCS_KEY); + queue = (SortQueue) context.get(SORT_QUEUE_KEY); + sortDoc = (SortDoc) context.get(SORT_DOC_KEY); + totalHits = (Integer) context.get(TOTAL_HITS_KEY); + exportWriter = (ExportWriter) context.get(EXPORT_WRITER_KEY); + leaves = (List) context.get(LEAF_READERS_KEY); + count = 0; + } + + @Override + public void close() throws IOException { + exportWriter = null; + leaves = null; + } + + @Override + public Tuple read() throws IOException { + if (pos < 0) { + if (count < totalHits) { + outDocIndex = exportWriter.fillOutDocs(leaves, sortDoc, queue, docs); + count += (outDocIndex + 1); + pos = outDocIndex; + } else { + return Tuple.EOF(); + } + } + if (pos < 0) { + return Tuple.EOF(); + } + Tuple tuple = new Tuple(); + entryWriter.setTuple(tuple); + SortDoc s = docs[pos]; + exportWriter.writeDoc(s, leaves, entryWriter); + s.reset(); + pos--; + return tuple; + } + + @Override + public StreamComparator getStreamSort() { + return streamComparator; + } + + @Override + public StreamExpressionParameter toExpression(StreamFactory factory) throws IOException { + StreamExpression expression = new StreamExpression(factory.getFunctionName(this.getClass())); + return expression; + } + + @Override + public Explanation toExplanation(StreamFactory factory) throws IOException { + return new StreamExplanation(getStreamNodeId().toString()) + .withFunctionName("input") + .withImplementingClass(this.getClass().getName()) + .withExpressionType(Explanation.ExpressionType.STREAM_SOURCE) + .withExpression("--non-expressible--"); + } + } + + + public ExportWriter(SolrQueryRequest req, SolrQueryResponse res, String wt, StreamContext initialStreamContext) { this.req = req; this.res = res; this.wt = wt; - + this.initialStreamContext = initialStreamContext; } @Override @@ -216,6 +374,36 @@ public class ExportWriter implements SolrCore.RawWriter, Closeable { return; } + String expr = params.get(StreamParams.EXPR); + if (expr != null) { + StreamFactory streamFactory = initialStreamContext.getStreamFactory(); + streamFactory.withDefaultSort(params.get(CommonParams.SORT)); + try { + StreamExpression expression = StreamExpressionParser.parse(expr); + if (streamFactory.isEvaluator(expression)) { + streamExpression = new StreamExpression(StreamParams.TUPLE); + streamExpression.addParameter(new StreamExpressionNamedParameter(StreamParams.RETURN_VALUE, expression)); + } else { + streamExpression = expression; + } + } catch (Exception e) { + writeException(e, writer, true); + return; + } + streamContext = new StreamContext(); + streamContext.setRequestParams(params); + streamContext.setLocal(true); + + streamContext.workerID = 0; + streamContext.numWorkers = 1; + streamContext.setSolrClientCache(initialStreamContext.getSolrClientCache()); + streamContext.setModelCache(initialStreamContext.getModelCache()); + streamContext.setObjectCache(initialStreamContext.getObjectCache()); + streamContext.put("core", req.getCore().getName()); + streamContext.put("solr-core", req.getCore()); + streamContext.put(CommonParams.SORT, params.get(CommonParams.SORT)); + } + writer.writeMap(m -> { m.put("responseHeader", singletonMap("status", 0)); m.put("response", (MapWriter) mw -> { @@ -223,7 +411,18 @@ public class ExportWriter implements SolrCore.RawWriter, Closeable { mw.put("docs", (IteratorWriter) iw -> writeDocs(req, iw, sort)); }); }); + if (streamContext != null) { + streamContext = null; + } + } + private TupleStream createTupleStream() throws IOException { + StreamFactory streamFactory = (StreamFactory)initialStreamContext.getStreamFactory().clone(); + //Set the sort in the stream factory so it can be used during initialization. + streamFactory.withDefaultSort(((String)streamContext.get(CommonParams.SORT))); + TupleStream tupleStream = streamFactory.constructStream(streamExpression); + tupleStream.setStreamContext(streamContext); + return tupleStream; } protected void identifyLowestSortingUnexportedDocs(List leaves, SortDoc sortDoc, SortQueue queue) throws IOException { @@ -285,22 +484,47 @@ public class ExportWriter implements SolrCore.RawWriter, Closeable { protected void writeDocs(SolrQueryRequest req, IteratorWriter.ItemWriter writer, Sort sort) throws IOException { List leaves = req.getSearcher().getTopReaderContext().leaves(); SortDoc sortDoc = getSortDoc(req.getSearcher(), sort.getSort()); - int count = 0; final int queueSize = Math.min(DOCUMENT_BATCH_SIZE, totalHits); SortQueue queue = new SortQueue(queueSize, sortDoc); SortDoc[] outDocs = new SortDoc[queueSize]; - while (count < totalHits) { - identifyLowestSortingUnexportedDocs(leaves, sortDoc, queue); - int outDocsIndex = transferBatchToArrayForOutput(queue, outDocs); - - count += (outDocsIndex + 1); - addDocsToItemWriter(leaves, writer, outDocs, outDocsIndex); + if (streamExpression != null) { + streamContext.put(SORT_DOCS_KEY, outDocs); + streamContext.put(SORT_QUEUE_KEY, queue); + streamContext.put(SORT_DOC_KEY, sortDoc); + streamContext.put(TOTAL_HITS_KEY, totalHits); + streamContext.put(EXPORT_WRITER_KEY, this); + streamContext.put(LEAF_READERS_KEY, leaves); + TupleStream tupleStream = createTupleStream(); + tupleStream.open(); + for (;;) { + final Tuple t = tupleStream.read(); + if (t == null) { + break; + } + if (t.EOF) { + break; + } + writer.add((MapWriter) ew -> t.writeMap(ew)); + } + tupleStream.close(); + } else { + for (int count = 0; count < totalHits; ) { + int outDocsIndex = fillOutDocs(leaves, sortDoc, queue, outDocs); + count += (outDocsIndex + 1); + addDocsToItemWriter(leaves, writer, outDocs, outDocsIndex); + } } } - protected void writeDoc(SortDoc sortDoc, + private int fillOutDocs(List leaves, SortDoc sortDoc, + SortQueue sortQueue, SortDoc[] outDocs) throws IOException { + identifyLowestSortingUnexportedDocs(leaves, sortDoc, sortQueue); + return transferBatchToArrayForOutput(sortQueue, outDocs); + } + + void writeDoc(SortDoc sortDoc, List leaves, EntryWriter ew) throws IOException { diff --git a/solr/core/src/java/org/apache/solr/handler/export/SingleValueSortDoc.java b/solr/core/src/java/org/apache/solr/handler/export/SingleValueSortDoc.java index 963901c40e9..164c07b6de9 100644 --- a/solr/core/src/java/org/apache/solr/handler/export/SingleValueSortDoc.java +++ b/solr/core/src/java/org/apache/solr/handler/export/SingleValueSortDoc.java @@ -32,6 +32,11 @@ class SingleValueSortDoc extends SortDoc { return null; } + @Override + public SortValue[] getSortValues() { + return new SortValue[] { value1 }; + } + public void setNextReader(LeafReaderContext context) throws IOException { this.ord = context.ord; this.docBase = context.docBase; diff --git a/solr/core/src/java/org/apache/solr/handler/export/SortDoc.java b/solr/core/src/java/org/apache/solr/handler/export/SortDoc.java index 5e2c75de0ea..292e795da4e 100644 --- a/solr/core/src/java/org/apache/solr/handler/export/SortDoc.java +++ b/solr/core/src/java/org/apache/solr/handler/export/SortDoc.java @@ -45,6 +45,10 @@ class SortDoc { return null; } + public SortValue[] getSortValues() { + return sortValues; + } + public void setNextReader(LeafReaderContext context) throws IOException { this.ord = context.ord; this.docBase = context.docBase; diff --git a/solr/core/src/java/org/apache/solr/handler/export/StringFieldWriter.java b/solr/core/src/java/org/apache/solr/handler/export/StringFieldWriter.java index c14e4d7770d..b82c365c1df 100644 --- a/solr/core/src/java/org/apache/solr/handler/export/StringFieldWriter.java +++ b/solr/core/src/java/org/apache/solr/handler/export/StringFieldWriter.java @@ -72,9 +72,21 @@ class StringFieldWriter extends FieldWriter { if (ew instanceof JavaBinCodec.BinEntryWriter) { ew.put(this.field, utf8.reset(ref.bytes, ref.offset, ref.length, null)); } else { - fieldType.indexedToReadable(ref, cref); - String v = cref.toString(); + String v = null; + if(sortValue != null) { + v = ((StringValue) sortValue).getLastString(); + if(v == null) { + fieldType.indexedToReadable(ref, cref); + v = cref.toString(); + ((StringValue) sortValue).setLastString(v); + } + } else { + fieldType.indexedToReadable(ref, cref); + v = cref.toString(); + } + ew.put(this.field, v); + } return true; } diff --git a/solr/core/src/java/org/apache/solr/handler/export/StringValue.java b/solr/core/src/java/org/apache/solr/handler/export/StringValue.java index 5df4eebf81b..fc7056599f8 100644 --- a/solr/core/src/java/org/apache/solr/handler/export/StringValue.java +++ b/solr/core/src/java/org/apache/solr/handler/export/StringValue.java @@ -24,6 +24,7 @@ import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.index.MultiDocValues; import org.apache.lucene.index.OrdinalMap; import org.apache.lucene.index.SortedDocValues; +import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.LongValues; class StringValue implements SortValue { @@ -40,6 +41,10 @@ class StringValue implements SortValue { protected int lastDocID; private boolean present; + private BytesRef lastBytes; + private String lastString; + private int lastOrd = -1; + public StringValue(SortedDocValues globalDocValues, String field, IntComp comp) { this.globalDocValues = globalDocValues; this.docValues = globalDocValues; @@ -52,6 +57,14 @@ class StringValue implements SortValue { this.present = false; } + public String getLastString() { + return this.lastString; + } + + public void setLastString(String lastString) { + this.lastString = lastString; + } + public StringValue copy() { return new StringValue(globalDocValues, field, comp); } @@ -88,7 +101,12 @@ class StringValue implements SortValue { public Object getCurrentValue() throws IOException { assert present == true; - return docValues.lookupOrd(currentOrd); + if (currentOrd != lastOrd) { + lastBytes = docValues.lookupOrd(currentOrd); + lastOrd = currentOrd; + lastString = null; + } + return lastBytes; } public String getField() { @@ -109,7 +127,7 @@ class StringValue implements SortValue { } public int compareTo(SortValue o) { - StringValue sv = (StringValue)o; + StringValue sv = (StringValue) o; return comp.compare(currentOrd, sv.currentOrd); } diff --git a/solr/core/src/java/org/apache/solr/handler/sql/LimitStream.java b/solr/core/src/java/org/apache/solr/handler/sql/LimitStream.java index 0d4bb72adf4..772f639a762 100644 --- a/solr/core/src/java/org/apache/solr/handler/sql/LimitStream.java +++ b/solr/core/src/java/org/apache/solr/handler/sql/LimitStream.java @@ -26,9 +26,7 @@ import org.apache.solr.client.solrj.io.stream.expr.StreamFactory; import java.io.IOException; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; -import java.util.Map; class LimitStream extends TupleStream { @@ -79,9 +77,7 @@ class LimitStream extends TupleStream { public Tuple read() throws IOException { ++count; if(count > limit) { - Map fields = new HashMap<>(); - fields.put("EOF", "true"); - return new Tuple(fields); + return Tuple.EOF(); } return stream.read(); diff --git a/solr/core/src/java/org/apache/solr/response/GraphMLResponseWriter.java b/solr/core/src/java/org/apache/solr/response/GraphMLResponseWriter.java index 9bb7403d2e8..926d79ff5d0 100644 --- a/solr/core/src/java/org/apache/solr/response/GraphMLResponseWriter.java +++ b/solr/core/src/java/org/apache/solr/response/GraphMLResponseWriter.java @@ -95,9 +95,9 @@ public class GraphMLResponseWriter implements QueryResponseWriter { printWriter.write(" outfields = new ArrayList(); - Iterator keys = tuple.fields.keySet().iterator(); + Iterator keys = tuple.getFields().keySet().iterator(); while(keys.hasNext()) { - String key = keys.next(); + String key = String.valueOf(keys.next()); if(key.equals("node") || key.equals("ancestors") || key.equals("collection")) { continue; } else { diff --git a/solr/core/src/test/org/apache/solr/core/HelloStream.java b/solr/core/src/test/org/apache/solr/core/HelloStream.java index be285e5886d..370200504dc 100644 --- a/solr/core/src/test/org/apache/solr/core/HelloStream.java +++ b/solr/core/src/test/org/apache/solr/core/HelloStream.java @@ -18,9 +18,7 @@ package org.apache.solr.core; import java.io.IOException; -import java.util.HashMap; import java.util.List; -import java.util.Map; import org.apache.solr.client.solrj.io.Tuple; import org.apache.solr.client.solrj.io.comp.StreamComparator; @@ -67,14 +65,10 @@ public class HelloStream extends TupleStream implements Expressible{ @Override public Tuple read() throws IOException { if (isSentHelloWorld) { - Map m = new HashMap(); - m.put("EOF", true); - return new Tuple(m); + return Tuple.EOF(); } else { isSentHelloWorld = true; - Map m = new HashMap<>(); - m.put("msg", "Hello World!"); - return new Tuple(m); + return new Tuple("msg", "Hello World!"); } } diff --git a/solr/core/src/test/org/apache/solr/handler/export/TestExportWriter.java b/solr/core/src/test/org/apache/solr/handler/export/TestExportWriter.java index 4bd21fe36b2..c033b795ea5 100644 --- a/solr/core/src/test/org/apache/solr/handler/export/TestExportWriter.java +++ b/solr/core/src/test/org/apache/solr/handler/export/TestExportWriter.java @@ -42,6 +42,8 @@ import org.junit.BeforeClass; import org.junit.Test; public class TestExportWriter extends SolrTestCaseJ4 { + + private ObjectMapper mapper = new ObjectMapper(); @BeforeClass public static void beforeClass() throws Exception { @@ -706,6 +708,56 @@ public class TestExportWriter extends SolrTestCaseJ4 { validateSort(numDocs); } + private void createLargeIndex() throws Exception { + int BATCH_SIZE = 1000; + int NUM_BATCHES = 100; + SolrInputDocument[] docs = new SolrInputDocument[BATCH_SIZE]; + for (int i = 0; i < NUM_BATCHES; i++) { + for (int j = 0; j < BATCH_SIZE; j++) { + docs[j] = new SolrInputDocument( + "id", String.valueOf(i * BATCH_SIZE + j), + "batch_i_p", String.valueOf(i), + "random_i_p", String.valueOf(random().nextInt(BATCH_SIZE)), + "sortabledv", TestUtil.randomSimpleString(random(), 2, 3), + "sortabledv_udvas", String.valueOf(random().nextInt(100)), + "small_i_p", String.valueOf((i + j) % 7) + ); + } + updateJ(jsonAdd(docs), null); + } + assertU(commit()); + } + + @Test + public void testExpr() throws Exception { + assertU(delQ("*:*")); + assertU(commit()); + createLargeIndex(); + SolrQueryRequest req = req("q", "*:*", "qt", "/export", "fl", "id", "sort", "id asc", "expr", "top(n=2,input(),sort=\"id desc\")"); + assertJQ(req, + "response/numFound==100000", + "response/docs/[0]/id=='99999'", + "response/docs/[1]/id=='99998'" + ); + req = req("q", "*:*", "qt", "/export", "fl", "id,sortabledv_udvas", "sort", "sortabledv_udvas asc", "expr", "unique(input(),over=\"sortabledv_udvas\")"); + String rsp = h.query(req); + Map rspMap = mapper.readValue(rsp, HashMap.class); + List> docs = (List>) Utils.getObjectByPath(rspMap, false, "/response/docs"); + assertNotNull("missing document results: " + rspMap, docs); + assertEquals("wrong number of unique docs", 100, docs.size()); + for (int i = 0; i < 99; i++) { + boolean found = false; + String si = String.valueOf(i); + for (int j = 0; j < docs.size(); j++) { + if (docs.get(j).get("sortabledv_udvas").equals(si)) { + found = true; + break; + } + } + assertTrue("missing value " + i + " in results", found); + } + } + private void validateSort(int numDocs) throws Exception { // 10 fields List fieldNames = new ArrayList<>(Arrays.asList("floatdv", "intdv", "stringdv", "longdv", "doubledv", @@ -727,7 +779,6 @@ public class TestExportWriter extends SolrTestCaseJ4 { String fieldsStr = String.join(",", fieldStrs); // fl : field1, field2 String resp = h.query(req("q", "*:*", "qt", "/export", "fl", "id," + fieldsStr, "sort", sortStr)); - ObjectMapper mapper = new ObjectMapper(); HashMap respMap = mapper.readValue(resp, HashMap.class); List docs = (ArrayList) ((HashMap) respMap.get("response")).get("docs"); diff --git a/solr/solr-ref-guide/src/exporting-result-sets.adoc b/solr/solr-ref-guide/src/exporting-result-sets.adoc index b0565d87931..8a072f24faa 100644 --- a/solr/solr-ref-guide/src/exporting-result-sets.adoc +++ b/solr/solr-ref-guide/src/exporting-result-sets.adoc @@ -59,6 +59,34 @@ It can get worse otherwise. The `fl` property defines the fields that will be exported with the result set. Any of the field types that can be sorted (i.e., int, long, float, double, string, date, boolean) can be used in the field list. The fields can be single or multi-valued. However, returning scores and wildcards are not supported at this time. +=== Specifying the Local Streaming Expression + +The optional `expr` property defines a <> that allows documents to be processed locally before they are exported in the result set. + +Expressions have to use a special `input()` stream that represents original results from the `/export` handler. Output from the stream expression then becomes the output from the `/export` handler. The `&streamLocalOnly=true` flag is always set for this streaming expression. + +Only stream <> and <> are supported in these expressions - using any of the <> expressions except for the pre-defined `input()` will result in an error. + +Using stream expressions with the `/export` handler may result in dramatic performance improvements due to the local in-memory reduction of the number of documents to be returned. + +Here's an example of using `top` decorator for returning only top N results: +[source,text] +---- +http://localhost:8983/solr/core_name/export?q=my-query&sort=timestamp+desc,&fl=timestamp,reporter,severity&expr=top(n=2,input(),sort="timestamp+desc") +---- + +(Note that the sort spec in the `top` decorator must match the sort spec in the +handler parameter). + +Here's an example of using `unique` decorator: + +[source,text] +---- +http://localhost:8983/solr/core_name/export?q=my-query&sort=reporter+desc,&fl=reporter&expr=unique(input(),over="reporter") +---- + +(Note that the `over` parameter must use one of the fields requested in the `fl` parameter). + == Distributed Support See the section <> for distributed support. diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/Tuple.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/Tuple.java index 56d86fe260d..2bdb2aa227c 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/Tuple.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/Tuple.java @@ -25,6 +25,7 @@ import java.util.List; import java.util.Map; import org.apache.solr.common.MapWriter; +import org.apache.solr.common.params.StreamParams; /** * A simple abstraction of a record containing key/value pairs. @@ -40,28 +41,58 @@ public class Tuple implements Cloneable, MapWriter { * The EOF Tuple will not contain a record from the stream, but it may contain * metrics/aggregates gathered by underlying streams. * */ - public boolean EOF; + /** + * When EXCEPTION field is true the Tuple marks an exception in the stream + * and the corresponding "EXCEPTION" field contains a related message. + */ public boolean EXCEPTION; - public Map fields = new HashMap(); + /** + * Tuple fields. + * @deprecated use {@link #getFields()} instead of this public field. + */ + public Map fields = new HashMap<>(2); + /** + * External serializable field names. + * @deprecated use {@link #getFieldNames()} instead of this public field. + */ public List fieldNames; + /** + * Mapping of external field names to internal tuple field names. + * @deprecated use {@link #getFieldLabels()} instead of this public field. + */ public Map fieldLabels; - public Tuple(){ + public Tuple() { // just an empty tuple } - - public Tuple(Map fields) { - if(fields.containsKey("EOF")) { - EOF = true; - } - if(fields.containsKey("EXCEPTION")){ - EXCEPTION = true; + /** + * A copy constructor. + * @param fields map containing keys and values to be copied to this tuple + */ + public Tuple(Map fields) { + for (Map.Entry entry : fields.entrySet()) { + put(entry.getKey(), entry.getValue()); } + } - this.fields.putAll(fields); + /** + * Constructor that accepts an even number of arguments as key / value pairs. + * @param fields a list of key / value pairs, with keys at odd and values at + * even positions. + */ + public Tuple(Object... fields) { + if (fields == null) { + return; + } + if ((fields.length % 2) != 0) { + throw new RuntimeException("must have a matching number of key-value pairs"); + } + for (int i = 0; i < fields.length; i += 2) { + put(fields[i], fields[i + 1]); + } } public Object get(Object key) { @@ -70,9 +101,14 @@ public class Tuple implements Cloneable, MapWriter { public void put(Object key, Object value) { this.fields.put(key, value); + if (key.equals(StreamParams.EOF)) { + EOF = true; + } else if (key.equals(StreamParams.EXCEPTION)) { + EXCEPTION = true; + } } - - public void remove(Object key){ + + public void remove(Object key) { this.fields.remove(key); } @@ -80,16 +116,16 @@ public class Tuple implements Cloneable, MapWriter { return String.valueOf(this.fields.get(key)); } - public String getException(){ return (String)this.fields.get("EXCEPTION"); } + public String getException() { return (String)this.fields.get(StreamParams.EXCEPTION); } public Long getLong(Object key) { Object o = this.fields.get(key); - if(o == null) { + if (o == null) { return null; } - if(o instanceof Long) { + if (o instanceof Long) { return (Long) o; } else if (o instanceof Number) { return ((Number)o).longValue(); @@ -149,11 +185,11 @@ public class Tuple implements Cloneable, MapWriter { public Double getDouble(Object key) { Object o = this.fields.get(key); - if(o == null) { + if (o == null) { return null; } - if(o instanceof Double) { + if (o instanceof Double) { return (Double)o; } else { //Attempt to parse the double @@ -173,39 +209,78 @@ public class Tuple implements Cloneable, MapWriter { return (List)this.fields.get(key); } + /** + * Return all tuple fields and their values. + */ + public Map getFields() { + return this.fields; + } + + /** + * Return all tuple fields. + * @deprecated use {@link #getFields()} instead. + */ + @Deprecated(since = "8.6.0") public Map getMap() { return this.fields; } + /** + * This represents the mapping of external field labels to the tuple's + * internal field names if they are different from field names. + * @return field labels or null + */ + public Map getFieldLabels() { + return fieldLabels; + } + + public void setFieldLabels(Map fieldLabels) { + this.fieldLabels = fieldLabels; + } + + /** + * A list of field names to serialize. This list (together with + * the mapping in {@link #getFieldLabels()} determines what tuple values + * are serialized and their external (serialized) names. + * @return list of external field names or null + */ + public List getFieldNames() { + return fieldNames; + } + + public void setFieldNames(List fieldNames) { + this.fieldNames = fieldNames; + } + public List getMaps(Object key) { - return (List)this.fields.get(key); + return (List) this.fields.get(key); } public void setMaps(Object key, List maps) { this.fields.put(key, maps); } - public Map getMetrics() { - return (Map)this.fields.get("_METRICS_"); + public Map getMetrics() { + return (Map) this.fields.get(StreamParams.METRICS); } public void setMetrics(Map metrics) { - this.fields.put("_METRICS_", metrics); + this.fields.put(StreamParams.METRICS, metrics); } public Tuple clone() { - HashMap m = new HashMap(fields); - Tuple clone = new Tuple(m); + Tuple clone = new Tuple(); + clone.fields.putAll(fields); return clone; } - public void merge(Tuple other){ - fields.putAll(other.getMap()); + public void merge(Tuple other) { + fields.putAll(other.getFields()); } @Override public void writeMap(EntryWriter ew) throws IOException { - if(fieldNames == null) { + if (fieldNames == null) { fields.forEach((k, v) -> { try { ew.put((String) k, v); @@ -214,10 +289,33 @@ public class Tuple implements Cloneable, MapWriter { } }); } else { - for(String fieldName : fieldNames) { + for (String fieldName : fieldNames) { String label = fieldLabels.get(fieldName); ew.put(label, fields.get(label)); } } } + + /** + * Create a new empty tuple marked as EOF. + */ + public static Tuple EOF() { + Tuple tuple = new Tuple(); + tuple.put(StreamParams.EOF, true); + return tuple; + } + + /** + * Create a new empty tuple marked as EXCEPTION, and optionally EOF. + * @param msg exception message + * @param eof if true the tuple will be marked as EOF + */ + public static Tuple EXCEPTION(String msg, boolean eof) { + Tuple tuple = new Tuple(); + tuple.put(StreamParams.EXCEPTION, msg); + if (eof) { + tuple.put(StreamParams.EOF, true); + } + return tuple; + } } diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/eval/AnovaEvaluator.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/eval/AnovaEvaluator.java index f859392720b..b570712818d 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/eval/AnovaEvaluator.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/eval/AnovaEvaluator.java @@ -18,16 +18,15 @@ package org.apache.solr.client.solrj.io.eval; import java.io.IOException; import java.util.Arrays; -import java.util.HashMap; import java.util.List; import java.util.Locale; -import java.util.Map; import java.util.stream.Collectors; import org.apache.commons.math3.stat.inference.OneWayAnova; import org.apache.solr.client.solrj.io.Tuple; import org.apache.solr.client.solrj.io.stream.expr.StreamExpression; import org.apache.solr.client.solrj.io.stream.expr.StreamFactory; +import org.apache.solr.common.params.StreamParams; public class AnovaEvaluator extends RecursiveNumericListEvaluator implements ManyValueWorker { protected static final long serialVersionUID = 1L; @@ -55,10 +54,10 @@ public class AnovaEvaluator extends RecursiveNumericListEvaluator implements Man OneWayAnova anova = new OneWayAnova(); double p = anova.anovaPValue(anovaInput); double f = anova.anovaFValue(anovaInput); - Map m = new HashMap<>(); - m.put("p-value", p); - m.put("f-ratio", f); - return new Tuple(m); + Tuple tuple = new Tuple(); + tuple.put(StreamParams.P_VALUE, p); + tuple.put("f-ratio", f); + return tuple; } diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/eval/ChiSquareDataSetEvaluator.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/eval/ChiSquareDataSetEvaluator.java index 70131d09a7f..26ab319ce32 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/eval/ChiSquareDataSetEvaluator.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/eval/ChiSquareDataSetEvaluator.java @@ -18,14 +18,13 @@ package org.apache.solr.client.solrj.io.eval; import java.io.IOException; -import java.util.HashMap; import java.util.List; -import java.util.Map; import org.apache.commons.math3.stat.inference.ChiSquareTest; import org.apache.solr.client.solrj.io.Tuple; import org.apache.solr.client.solrj.io.stream.expr.StreamExpression; import org.apache.solr.client.solrj.io.stream.expr.StreamFactory; +import org.apache.solr.common.params.StreamParams; public class ChiSquareDataSetEvaluator extends RecursiveNumericListEvaluator implements TwoValueWorker { @@ -58,10 +57,10 @@ public class ChiSquareDataSetEvaluator extends RecursiveNumericListEvaluator imp double chiSquare = chiSquareTest.chiSquareDataSetsComparison(sampleA, sampleB); double p = chiSquareTest.chiSquareTestDataSetsComparison(sampleA, sampleB); - Map m = new HashMap<>(); - m.put("chisquare-statistic", chiSquare); - m.put("p-value", p); - return new Tuple(m); + Tuple tuple = new Tuple(); + tuple.put("chisquare-statistic", chiSquare); + tuple.put(StreamParams.P_VALUE, p); + return tuple; } } \ No newline at end of file diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/eval/DescribeEvaluator.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/eval/DescribeEvaluator.java index 2fce7a0a6a8..27ef0de392c 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/eval/DescribeEvaluator.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/eval/DescribeEvaluator.java @@ -17,10 +17,8 @@ package org.apache.solr.client.solrj.io.eval; import java.io.IOException; -import java.util.HashMap; import java.util.List; import java.util.Locale; -import java.util.Map; import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics; import org.apache.solr.client.solrj.io.Tuple; @@ -49,20 +47,20 @@ public class DescribeEvaluator extends RecursiveNumericEvaluator implements OneV DescriptiveStatistics descriptiveStatistics = new DescriptiveStatistics(); ((List)value).stream().mapToDouble(innerValue -> ((Number)innerValue).doubleValue()).forEach(innerValue -> descriptiveStatistics.addValue(innerValue)); - Map map = new HashMap<>(); - map.put("max", descriptiveStatistics.getMax()); - map.put("mean", descriptiveStatistics.getMean()); - map.put("min", descriptiveStatistics.getMin()); - map.put("stdev", descriptiveStatistics.getStandardDeviation()); - map.put("sum", descriptiveStatistics.getSum()); - map.put("N", descriptiveStatistics.getN()); - map.put("var", descriptiveStatistics.getVariance()); - map.put("kurtosis", descriptiveStatistics.getKurtosis()); - map.put("skewness", descriptiveStatistics.getSkewness()); - map.put("popVar", descriptiveStatistics.getPopulationVariance()); - map.put("geometricMean", descriptiveStatistics.getGeometricMean()); - map.put("sumsq", descriptiveStatistics.getSumsq()); + Tuple tuple = new Tuple(); + tuple.put("max", descriptiveStatistics.getMax()); + tuple.put("mean", descriptiveStatistics.getMean()); + tuple.put("min", descriptiveStatistics.getMin()); + tuple.put("stdev", descriptiveStatistics.getStandardDeviation()); + tuple.put("sum", descriptiveStatistics.getSum()); + tuple.put("N", descriptiveStatistics.getN()); + tuple.put("var", descriptiveStatistics.getVariance()); + tuple.put("kurtosis", descriptiveStatistics.getKurtosis()); + tuple.put("skewness", descriptiveStatistics.getSkewness()); + tuple.put("popVar", descriptiveStatistics.getPopulationVariance()); + tuple.put("geometricMean", descriptiveStatistics.getGeometricMean()); + tuple.put("sumsq", descriptiveStatistics.getSumsq()); - return new Tuple(map); + return tuple; } } diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/eval/FrequencyTableEvaluator.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/eval/FrequencyTableEvaluator.java index efab9025c99..200017870d4 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/eval/FrequencyTableEvaluator.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/eval/FrequencyTableEvaluator.java @@ -19,11 +19,9 @@ package org.apache.solr.client.solrj.io.eval; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; -import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Locale; -import java.util.Map; import org.apache.commons.math3.stat.Frequency; @@ -72,13 +70,13 @@ public class FrequencyTableEvaluator extends RecursiveNumericEvaluator implement while(iterator.hasNext()){ Long value = (Long)iterator.next(); - Map map = new HashMap<>(); - map.put("value", value.longValue()); - map.put("count", frequency.getCount(value)); - map.put("cumFreq", frequency.getCumFreq(value)); - map.put("cumPct", frequency.getCumPct(value)); - map.put("pct", frequency.getPct(value)); - histogramBins.add(new Tuple(map)); + Tuple tuple = new Tuple(); + tuple.put("value", value.longValue()); + tuple.put("count", frequency.getCount(value)); + tuple.put("cumFreq", frequency.getCumFreq(value)); + tuple.put("cumPct", frequency.getCumPct(value)); + tuple.put("pct", frequency.getPct(value)); + histogramBins.add(tuple); } return histogramBins; } diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/eval/GTestDataSetEvaluator.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/eval/GTestDataSetEvaluator.java index 6aa5d8ac703..e83b4f90719 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/eval/GTestDataSetEvaluator.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/eval/GTestDataSetEvaluator.java @@ -18,14 +18,13 @@ package org.apache.solr.client.solrj.io.eval; import java.io.IOException; -import java.util.HashMap; import java.util.List; -import java.util.Map; import org.apache.commons.math3.stat.inference.GTest; import org.apache.solr.client.solrj.io.Tuple; import org.apache.solr.client.solrj.io.stream.expr.StreamExpression; import org.apache.solr.client.solrj.io.stream.expr.StreamFactory; +import org.apache.solr.common.params.StreamParams; public class GTestDataSetEvaluator extends RecursiveNumericListEvaluator implements TwoValueWorker { @@ -58,9 +57,9 @@ public class GTestDataSetEvaluator extends RecursiveNumericListEvaluator impleme double g = gTest.gDataSetsComparison(sampleA, sampleB); double p = gTest.gTestDataSetsComparison(sampleA, sampleB); - Map m = new HashMap<>(); - m.put("G-statistic", g); - m.put("p-value", p); - return new Tuple(m); + Tuple tuple = new Tuple(); + tuple.put("G-statistic", g); + tuple.put(StreamParams.P_VALUE, p); + return tuple; } } \ No newline at end of file diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/eval/HistogramEvaluator.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/eval/HistogramEvaluator.java index 8d2761469f9..fd6fcf65c84 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/eval/HistogramEvaluator.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/eval/HistogramEvaluator.java @@ -19,10 +19,8 @@ package org.apache.solr.client.solrj.io.eval; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; -import java.util.HashMap; import java.util.List; import java.util.Locale; -import java.util.Map; import org.apache.commons.math3.random.EmpiricalDistribution; import org.apache.commons.math3.stat.descriptive.SummaryStatistics; @@ -71,17 +69,17 @@ public class HistogramEvaluator extends RecursiveNumericEvaluator implements Man List histogramBins = new ArrayList<>(); for(SummaryStatistics binSummary : distribution.getBinStats()) { - Map map = new HashMap<>(); - map.put("max", binSummary.getMax()); - map.put("mean", binSummary.getMean()); - map.put("min", binSummary.getMin()); - map.put("stdev", binSummary.getStandardDeviation()); - map.put("sum", binSummary.getSum()); - map.put("N", binSummary.getN()); - map.put("var", binSummary.getVariance()); - map.put("cumProb", distribution.cumulativeProbability(binSummary.getMean())); - map.put("prob", distribution.probability(binSummary.getMin(), binSummary.getMax())); - histogramBins.add(new Tuple(map)); + Tuple tuple = new Tuple(); + tuple.put("max", binSummary.getMax()); + tuple.put("mean", binSummary.getMean()); + tuple.put("min", binSummary.getMin()); + tuple.put("stdev", binSummary.getStandardDeviation()); + tuple.put("sum", binSummary.getSum()); + tuple.put("N", binSummary.getN()); + tuple.put("var", binSummary.getVariance()); + tuple.put("cumProb", distribution.cumulativeProbability(binSummary.getMean())); + tuple.put("prob", distribution.probability(binSummary.getMin(), binSummary.getMax())); + histogramBins.add(tuple); } return histogramBins; diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/eval/KolmogorovSmirnovEvaluator.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/eval/KolmogorovSmirnovEvaluator.java index 58e783e7a29..27256b14d57 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/eval/KolmogorovSmirnovEvaluator.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/eval/KolmogorovSmirnovEvaluator.java @@ -17,16 +17,15 @@ package org.apache.solr.client.solrj.io.eval; import java.io.IOException; -import java.util.HashMap; import java.util.List; import java.util.Locale; -import java.util.Map; import org.apache.commons.math3.distribution.RealDistribution; import org.apache.commons.math3.stat.inference.KolmogorovSmirnovTest; import org.apache.solr.client.solrj.io.Tuple; import org.apache.solr.client.solrj.io.stream.expr.StreamExpression; import org.apache.solr.client.solrj.io.stream.expr.StreamFactory; +import org.apache.solr.common.params.StreamParams; public class KolmogorovSmirnovEvaluator extends RecursiveObjectEvaluator implements TwoValueWorker { @@ -54,17 +53,17 @@ public class KolmogorovSmirnovEvaluator extends RecursiveObjectEvaluator impleme if(first instanceof RealDistribution){ RealDistribution realDistribution = (RealDistribution)first; - Map m = new HashMap<>(); - m.put("p-value", ks.kolmogorovSmirnovTest(realDistribution, data)); - m.put("d-statistic", ks.kolmogorovSmirnovStatistic(realDistribution, data)); - return new Tuple(m); + Tuple tuple = new Tuple(); + tuple.put(StreamParams.P_VALUE, ks.kolmogorovSmirnovTest(realDistribution, data)); + tuple.put("d-statistic", ks.kolmogorovSmirnovStatistic(realDistribution, data)); + return tuple; } else if(first instanceof List && ((List) first).stream().noneMatch(item -> !(item instanceof Number))){ double[] data2 = ((List)first).stream().mapToDouble(item -> ((Number)item).doubleValue()).toArray(); - - Map m = new HashMap<>(); - m.put("d-statistic", ks.kolmogorovSmirnovTest(data, data2)); - return new Tuple(m); + + Tuple tuple = new Tuple(); + tuple.put("d-statistic", ks.kolmogorovSmirnovTest(data, data2)); + return tuple; } else{ throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - found type %s for the first value, expecting a RealDistribution or list of numbers",toExpression(constructingFactory), first.getClass().getSimpleName())); diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/eval/MannWhitneyUEvaluator.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/eval/MannWhitneyUEvaluator.java index d8cb214e7a6..6c6e278724b 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/eval/MannWhitneyUEvaluator.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/eval/MannWhitneyUEvaluator.java @@ -19,16 +19,15 @@ package org.apache.solr.client.solrj.io.eval; import java.io.IOException; import java.util.Arrays; -import java.util.HashMap; import java.util.List; import java.util.Locale; -import java.util.Map; import java.util.stream.Collectors; import org.apache.commons.math3.stat.inference.MannWhitneyUTest; import org.apache.solr.client.solrj.io.Tuple; import org.apache.solr.client.solrj.io.stream.expr.StreamExpression; import org.apache.solr.client.solrj.io.stream.expr.StreamFactory; +import org.apache.solr.common.params.StreamParams; public class MannWhitneyUEvaluator extends RecursiveNumericListEvaluator implements ManyValueWorker { @@ -52,10 +51,10 @@ public class MannWhitneyUEvaluator extends RecursiveNumericListEvaluator impleme MannWhitneyUTest mannwhitneyutest = new MannWhitneyUTest(); double u = mannwhitneyutest.mannWhitneyU(mannWhitneyUInput.get(0), mannWhitneyUInput.get(1)); double p = mannwhitneyutest.mannWhitneyUTest(mannWhitneyUInput.get(0), mannWhitneyUInput.get(1)); - Map m = new HashMap<>(); - m.put("u-statistic", u); - m.put("p-value", p); - return new Tuple(m); + Tuple tuple = new Tuple(); + tuple.put("u-statistic", u); + tuple.put(StreamParams.P_VALUE, p); + return tuple; }else{ throw new IOException(String.format(Locale.ROOT,"%s(...) only works with a list of 2 arrays but a list of %d array(s) was provided.", constructingFactory.getFunctionName(getClass()), mannWhitneyUInput.size())); } diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/eval/OutliersEvaluator.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/eval/OutliersEvaluator.java index 0f2474dad74..ac6b8542e06 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/eval/OutliersEvaluator.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/eval/OutliersEvaluator.java @@ -18,7 +18,6 @@ package org.apache.solr.client.solrj.io.eval; import java.io.IOException; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; import org.apache.commons.math3.distribution.IntegerDistribution; @@ -77,7 +76,7 @@ public class OutliersEvaluator extends RecursiveObjectEvaluator implements ManyV } else { tuples = new ArrayList<>(); for(int i=0; i values1 = (List)value1; @@ -66,7 +63,7 @@ public class PairedTTestEvaluator extends RecursiveNumericListEvaluator implemen double tstat = tTest.pairedT(samples1, samples2); double pval = tTest.pairedTTest(samples1, samples2); tuple.put("t-statistic", tstat); - tuple.put("p-value", pval); + tuple.put(StreamParams.P_VALUE, pval); return tuple; } else { throw new IOException("Second parameter for pairedTtest must be a double array"); diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/eval/RecursiveEvaluator.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/eval/RecursiveEvaluator.java index 30455be4ea6..04f987ae0a0 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/eval/RecursiveEvaluator.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/eval/RecursiveEvaluator.java @@ -22,8 +22,6 @@ import java.math.BigDecimal; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; -import java.util.HashMap; -import java.util.Map; import java.util.List; import java.util.Locale; import java.util.Set; @@ -119,7 +117,6 @@ public abstract class RecursiveEvaluator implements StreamEvaluator, ValueWorker } - @SuppressWarnings({"unchecked"}) protected Object normalizeOutputType(Object value) { if(null == value){ return null; @@ -145,15 +142,13 @@ public abstract class RecursiveEvaluator implements StreamEvaluator, ValueWorker //If its a tuple and not a inner class that has extended tuple, which is done in a number of cases so that mathematical models //can be contained within a tuple. - @SuppressWarnings({"rawtypes"}) Tuple tuple = (Tuple)value; - @SuppressWarnings({"rawtypes"}) - Map map = new HashMap(); - for(Object o : tuple.fields.keySet()) { - Object v = tuple.fields.get(o); - map.put(o, normalizeOutputType(v)); + Tuple newTuple = new Tuple(); + for(Object o : tuple.getFields().keySet()) { + Object v = tuple.get(o); + newTuple.put(o, normalizeOutputType(v)); } - return new Tuple(map); + return newTuple; } else{ // anything else can just be returned as is diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/eval/SetValueEvaluator.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/eval/SetValueEvaluator.java index fb85e8d4c4d..c56ecc2d87e 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/eval/SetValueEvaluator.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/eval/SetValueEvaluator.java @@ -19,8 +19,6 @@ package org.apache.solr.client.solrj.io.eval; import java.io.IOException; import java.util.Locale; -import java.util.Map; -import java.util.HashMap; import org.apache.solr.client.solrj.io.stream.expr.StreamExpression; import org.apache.solr.client.solrj.io.stream.expr.StreamFactory; @@ -38,7 +36,6 @@ public class SetValueEvaluator extends RecursiveObjectEvaluator implements ManyV } @Override - @SuppressWarnings({"unchecked"}) public Object doWork(Object... values) throws IOException { if(values[0] instanceof Tuple) { Tuple tuple = (Tuple)values[0]; @@ -48,10 +45,9 @@ public class SetValueEvaluator extends RecursiveObjectEvaluator implements ManyV value = ((String)value).replace("\"", ""); } key = key.replace("\"", ""); - @SuppressWarnings({"rawtypes"}) - Map map = new HashMap(tuple.fields); - map.put(key, value); - return new Tuple(map); + Tuple newTuple = tuple.clone(); + newTuple.put(key, value); + return newTuple; } else { throw new IOException("The setValue function expects a Tuple as the first parameter"); } diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/eval/TTestEvaluator.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/eval/TTestEvaluator.java index dc8b37afddd..acc5e80d272 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/eval/TTestEvaluator.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/eval/TTestEvaluator.java @@ -17,15 +17,14 @@ package org.apache.solr.client.solrj.io.eval; import java.io.IOException; -import java.util.HashMap; import java.util.List; import java.util.Locale; -import java.util.Map; import org.apache.commons.math3.stat.inference.TTest; import org.apache.solr.client.solrj.io.Tuple; import org.apache.solr.client.solrj.io.stream.expr.StreamExpression; import org.apache.solr.client.solrj.io.stream.expr.StreamFactory; +import org.apache.solr.common.params.StreamParams; public class TTestEvaluator extends RecursiveNumericEvaluator implements TwoValueWorker { protected static final long serialVersionUID = 1L; @@ -42,9 +41,7 @@ public class TTestEvaluator extends RecursiveNumericEvaluator implements TwoValu public Object doWork(Object value1, Object value2) throws IOException { TTest tTest = new TTest(); - @SuppressWarnings({"rawtypes"}) - Map map = new HashMap(); - Tuple tuple = new Tuple(map); + Tuple tuple = new Tuple(); if(value1 instanceof Number) { double mean = ((Number) value1).doubleValue(); @@ -60,7 +57,7 @@ public class TTestEvaluator extends RecursiveNumericEvaluator implements TwoValu double pval = tTest.tTest(mean, samples); tuple.put("t-statistic", tstat); - tuple.put("p-value", pval); + tuple.put(StreamParams.P_VALUE, pval); return tuple; } else { throw new IOException("Second parameter for ttest must be a double array"); @@ -87,7 +84,7 @@ public class TTestEvaluator extends RecursiveNumericEvaluator implements TwoValu double tstat = tTest.t(samples1, samples2); double pval = tTest.tTest(samples1, samples2); tuple.put("t-statistic", tstat); - tuple.put("p-value", pval); + tuple.put(StreamParams.P_VALUE, pval); return tuple; } else { throw new IOException("Second parameter for ttest must be a double array"); diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/graph/GatherNodesStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/graph/GatherNodesStream.java index 5796065ddf4..702d03486eb 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/graph/GatherNodesStream.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/graph/GatherNodesStream.java @@ -613,10 +613,7 @@ public class GatherNodesStream extends TupleStream implements Expressible { if (out.hasNext()) { return out.next(); } else { - Map map = new HashMap(); - map.put("EOF", true); - Tuple tuple = new Tuple(map); - return tuple; + return Tuple.EOF(); } } @@ -645,14 +642,10 @@ public class GatherNodesStream extends TupleStream implements Expressible { public void setStreamContext(StreamContext context) {} public Tuple read() { - HashMap map = new HashMap(); if(it.hasNext()) { - map.put("node",it.next()); - return new Tuple(map); + return new Tuple("node",it.next()); } else { - - map.put("EOF", true); - return new Tuple(map); + return Tuple.EOF(); } } diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/graph/Node.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/graph/Node.java index befa5a7721c..7c99f75758f 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/graph/Node.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/graph/Node.java @@ -51,12 +51,12 @@ public class Node { } public Tuple toTuple(String collection, String field, int level, Traversal traversal) { - Map map = new HashMap(); + Tuple tuple = new Tuple(); - map.put("node", id); - map.put("collection", collection); - map.put("field", field); - map.put("level", level); + tuple.put("node", id); + tuple.put("collection", collection); + tuple.put("field", field); + tuple.put("level", level); boolean prependCollection = traversal.isMultiCollection(); List cols = traversal.getCollections(); @@ -76,15 +76,15 @@ public class Node { } } - map.put("ancestors", l); + tuple.put("ancestors", l); } if(metrics != null) { for(Metric metric : metrics) { - map.put(metric.getIdentifier(), metric.getValue()); + tuple.put(metric.getIdentifier(), metric.getValue()); } } - return new Tuple(map); + return tuple; } } \ No newline at end of file diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/graph/ShortestPathStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/graph/ShortestPathStream.java index 314ab92d5cf..9d12e48df34 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/graph/ShortestPathStream.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/graph/ShortestPathStream.java @@ -403,8 +403,7 @@ public class ShortestPathStream extends TupleStream implements Expressible { for(LinkedList p : paths) { String s = p.toString(); if (!finalPaths.contains(s)){ - Tuple shortestPath = new Tuple(new HashMap()); - shortestPath.put("path", p); + Tuple shortestPath = new Tuple("path", p); shortestPaths.add(shortestPath); finalPaths.add(s); } @@ -501,12 +500,11 @@ public class ShortestPathStream extends TupleStream implements Expressible { Tuple t = shortestPaths.removeFirst(); return t; } else { - Map m = new HashMap(); - m.put("EOF", true); + Tuple tuple = Tuple.EOF(); if(!found) { - m.put("sorry", "No path found"); + tuple.put("sorry", "No path found"); } - return new Tuple(m); + return tuple; } } diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/ops/GroupOperation.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/ops/GroupOperation.java index 3db76ec2047..59d83c5c1d8 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/ops/GroupOperation.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/ops/GroupOperation.java @@ -20,7 +20,6 @@ import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; import java.util.Comparator; -import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Locale; @@ -108,15 +107,15 @@ public class GroupOperation implements ReduceOperation { public Tuple reduce() { LinkedList ll = new LinkedList(); while(priorityQueue.size() > 0) { - ll.addFirst(priorityQueue.poll().getMap()); + ll.addFirst(priorityQueue.poll().getFields()); //This will clear priority queue and so it will be ready for the next group. } List list = new ArrayList(ll); Map groupHead = list.get(0); - Map map = new HashMap(groupHead); - map.put("group", list); - return new Tuple(map); + Tuple tuple = new Tuple(groupHead); + tuple.put("group", list); + return tuple; } public void operate(Tuple tuple) { diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CalculatorStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CalculatorStream.java index 49b5953b212..d2efbb248dc 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CalculatorStream.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CalculatorStream.java @@ -18,7 +18,6 @@ package org.apache.solr.client.solrj.io.stream; import java.io.IOException; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; import org.apache.solr.client.solrj.io.Tuple; @@ -86,16 +85,11 @@ public class CalculatorStream extends TupleStream implements Expressible { public Tuple read() throws IOException { - if(finished) { - HashMap m = new HashMap(); - m.put("EOF", true); - Tuple tuple = new Tuple(m); - return tuple; + if (finished) { + return Tuple.EOF(); } else { - HashMap m = new HashMap(); - Tuple tuple = new Tuple(m); finished = true; - return tuple; + return new Tuple(); } } diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CellStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CellStream.java index a87c9ee6fa4..fac8ca813d6 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CellStream.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CellStream.java @@ -18,10 +18,8 @@ package org.apache.solr.client.solrj.io.stream; import java.io.IOException; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; import java.util.Locale; -import java.util.Map; import org.apache.solr.client.solrj.io.Tuple; import org.apache.solr.client.solrj.io.comp.StreamComparator; @@ -134,9 +132,8 @@ public class CellStream extends TupleStream implements Expressible { } } - Map map = new HashMap(); - map.put(name, list); - tuple = new Tuple(map); + tuple = new Tuple(); + tuple.put(name, list); } finally { stream.close(); } diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CloudSolrStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CloudSolrStream.java index 95cf2399528..dfa0211a67c 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CloudSolrStream.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CloudSolrStream.java @@ -453,14 +453,11 @@ public class CloudSolrStream extends TupleStream implements Expressible { } return t; } else { - Map m = new HashMap(); + Tuple tuple = Tuple.EOF(); if(trace) { - m.put("_COLLECTION_", this.collection); + tuple.put("_COLLECTION_", this.collection); } - - m.put("EOF", true); - - return new Tuple(m); + return tuple; } } diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CommitStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CommitStream.java index b29ea09f6a0..5885862d601 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CommitStream.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CommitStream.java @@ -123,7 +123,7 @@ public class CommitStream extends TupleStream implements Expressible { // if the read document contains field 'batchIndexed' then it's a summary // document and we can update our count based on it's value. If not then // just increment by 1 - if(tuple.fields.containsKey(UpdateStream.BATCH_INDEXED_FIELD_NAME) && isInteger(tuple.getString(UpdateStream.BATCH_INDEXED_FIELD_NAME))){ + if(tuple.getFields().containsKey(UpdateStream.BATCH_INDEXED_FIELD_NAME) && isInteger(tuple.getString(UpdateStream.BATCH_INDEXED_FIELD_NAME))){ docsSinceCommit += Integer.parseInt(tuple.getString(UpdateStream.BATCH_INDEXED_FIELD_NAME)); } else{ diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CsvStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CsvStream.java index 561204fa302..386cb5d5aaa 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CsvStream.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CsvStream.java @@ -18,7 +18,6 @@ package org.apache.solr.client.solrj.io.stream; import java.io.IOException; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; import java.util.Locale; @@ -130,7 +129,7 @@ public class CsvStream extends TupleStream implements Expressible { if(fields.length != headers.length) { throw new IOException("Headers and lines must have the same number of fields [file:"+file+" line number:"+lineNumber+"]"); } - Tuple out = new Tuple(new HashMap()); + Tuple out = new Tuple(); out.put("id", file+"_"+lineNumber); for(int i=0; i 0) { diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/DaemonStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/DaemonStream.java index 0257be9e3d3..962b61a0de5 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/DaemonStream.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/DaemonStream.java @@ -21,7 +21,6 @@ import java.lang.Thread.State; import java.lang.invoke.MethodHandles; import java.util.ArrayList; import java.util.Date; -import java.util.HashMap; import java.util.List; import java.util.Locale; import java.util.Map; @@ -252,13 +251,13 @@ public class DaemonStream extends TupleStream implements Expressible { } public synchronized Tuple getInfo() { - Tuple tuple = new Tuple(new HashMap()); + Tuple tuple = new Tuple(); tuple.put(ID, id); tuple.put("startTime", startTime); tuple.put("stopTime", stopTime); tuple.put("iterations", iterations.get()); tuple.put("state", streamRunner.getState().toString()); - if(exception != null) { + if (exception != null) { tuple.put("exception", exception.getMessage()); } @@ -338,7 +337,7 @@ public class DaemonStream extends TupleStream implements Expressible { Tuple tuple = tupleStream.read(); if (tuple.EOF) { errors = 0; // Reset errors on successful run. - if (tuple.fields.containsKey("sleepMillis")) { + if (tuple.getFields().containsKey("sleepMillis")) { this.sleepMillis = tuple.getLong("sleepMillis"); if(terminate && sleepMillis > 0) { @@ -400,11 +399,8 @@ public class DaemonStream extends TupleStream implements Expressible { } if(!eatTuples) { - Map m = new HashMap(); - m.put("EOF", true); - Tuple tuple = new Tuple(m); try { - queue.put(tuple); + queue.put(Tuple.EOF()); } catch (InterruptedException e) { log.error("Error in DaemonStream:{}", id, e); } diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/DeepRandomStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/DeepRandomStream.java index 9c9c2015cdd..1d53604e8e8 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/DeepRandomStream.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/DeepRandomStream.java @@ -403,14 +403,11 @@ public class DeepRandomStream extends TupleStream implements Expressible { } return t; } else { - Map m = new HashMap(); + Tuple tuple = Tuple.EOF(); if(trace) { - m.put("_COLLECTION_", this.collection); + tuple.put("_COLLECTION_", this.collection); } - - m.put("EOF", true); - - return new Tuple(m); + return tuple; } } diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/EchoStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/EchoStream.java index 38e1cca7186..7749a0f3bb8 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/EchoStream.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/EchoStream.java @@ -18,7 +18,6 @@ package org.apache.solr.client.solrj.io.stream; import java.io.IOException; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; import org.apache.solr.client.solrj.io.Tuple; @@ -96,16 +95,10 @@ public class EchoStream extends TupleStream implements Expressible { public Tuple read() throws IOException { if(finished) { - HashMap m = new HashMap(); - m.put("EOF", true); - Tuple tuple = new Tuple(m); - return tuple; + return Tuple.EOF(); } else { - HashMap m = new HashMap(); - m.put("echo", echo); - Tuple tuple = new Tuple(m); finished = true; - return tuple; + return new Tuple("echo", echo); } } diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ExceptionStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ExceptionStream.java index 9d1f45081c3..d6cabf14f62 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ExceptionStream.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ExceptionStream.java @@ -18,9 +18,7 @@ package org.apache.solr.client.solrj.io.stream; import java.io.IOException; import java.lang.invoke.MethodHandles; -import java.util.HashMap; import java.util.List; -import java.util.Map; import org.apache.solr.client.solrj.io.Tuple; import org.apache.solr.client.solrj.io.comp.StreamComparator; @@ -60,21 +58,15 @@ public class ExceptionStream extends TupleStream { public Tuple read() { if(openException != null) { //There was an exception during the open. - Map fields = new HashMap(); - fields.put("EXCEPTION", openException.getMessage()); - fields.put("EOF", true); SolrException.log(log, openException); - return new Tuple(fields); + return Tuple.EXCEPTION(openException.getMessage(), true); } try { return stream.read(); } catch (Exception e) { - Map fields = new HashMap(); - fields.put("EXCEPTION", e.getMessage()); - fields.put("EOF", true); SolrException.log(log, e); - return new Tuple(fields); + return Tuple.EXCEPTION(e.getMessage(), true); } } diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/Facet2DStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/Facet2DStream.java index 2ccb147d472..7f3d63540a1 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/Facet2DStream.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/Facet2DStream.java @@ -20,11 +20,9 @@ package org.apache.solr.client.solrj.io.stream; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; -import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Locale; -import java.util.Map; import java.util.Map.Entry; import java.util.Optional; import java.util.stream.Collectors; @@ -308,11 +306,8 @@ public class Facet2DStream extends TupleStream implements Expressible { if (out.hasNext()) { return out.next(); } else { - Map fields = new HashMap(); - fields.put("rows", tuples.size()); - - fields.put("EOF", true); - Tuple tuple = new Tuple(fields); + Tuple tuple = Tuple.EOF(); + tuple.put("rows", tuples.size()); return tuple; } @@ -395,7 +390,7 @@ public class Facet2DStream extends TupleStream implements Expressible { } private void getTuples(NamedList response, Bucket x, Bucket y, Metric metric) { - Tuple tuple = new Tuple(new HashMap()); + Tuple tuple = new Tuple(); NamedList facets = (NamedList) response.get("facets"); fillTuples(0, tuples, tuple, facets, x, y, metric); } diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/FacetStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/FacetStream.java index 6e96cfd9485..29c48ae8132 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/FacetStream.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/FacetStream.java @@ -20,10 +20,8 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; -import java.util.HashMap; import java.util.List; import java.util.Locale; -import java.util.Map; import java.util.Map.Entry; import java.util.Optional; import java.util.stream.Collectors; @@ -626,15 +624,11 @@ public class FacetStream extends TupleStream implements Expressible { ++index; return tuple; } else { - Map fields = new HashMap(); + Tuple tuple = Tuple.EOF(); if(bucketSizeLimit == Integer.MAX_VALUE) { - fields.put("totalRows", tuples.size()); + tuple.put("totalRows", tuples.size()); } - - fields.put("EOF", true); - - Tuple tuple = new Tuple(fields); return tuple; } } @@ -771,7 +765,7 @@ public class FacetStream extends TupleStream implements Expressible { Bucket[] buckets, Metric[] metrics) { - Tuple tuple = new Tuple(new HashMap()); + Tuple tuple = new Tuple(); NamedList facets = (NamedList)response.get("facets"); fillTuples(0, tuples, diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/FeaturesSelectionStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/FeaturesSelectionStream.java index a9963d06a38..041e7c321b5 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/FeaturesSelectionStream.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/FeaturesSelectionStream.java @@ -362,21 +362,19 @@ public class FeaturesSelectionStream extends TupleStream implements Expressible{ for (Map.Entry termScore : termScores.entrySet()) { if (tuples.size() == numTerms) break; index++; - Map map = new HashMap(); - map.put(ID, featureSet + "_" + index); - map.put("index_i", index); - map.put("term_s", termScore.getKey()); - map.put("score_f", termScore.getValue()); - map.put("featureSet_s", featureSet); + Tuple tuple = new Tuple(); + tuple.put(ID, featureSet + "_" + index); + tuple.put("index_i", index); + tuple.put("term_s", termScore.getKey()); + tuple.put("score_f", termScore.getValue()); + tuple.put("featureSet_s", featureSet); long docFreq = docFreqs.get(termScore.getKey()); double d = Math.log(((double)numDocs / (double)(docFreq + 1))); - map.put("idf_d", d); - tuples.add(new Tuple(map)); + tuple.put("idf_d", d); + tuples.add(tuple); } - Map map = new HashMap(); - map.put("EOF", true); - tuples.add(new Tuple(map)); + tuples.add(Tuple.EOF()); tupleIterator = tuples.iterator(); } diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/GetStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/GetStream.java index 1655bfba0fb..8b827614cba 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/GetStream.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/GetStream.java @@ -18,7 +18,6 @@ package org.apache.solr.client.solrj.io.stream; import java.io.IOException; import java.util.ArrayList; -import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -89,14 +88,11 @@ public class GetStream extends TupleStream implements Expressible { } public Tuple read() throws IOException { - Map map = new HashMap(); - if(tupleIterator.hasNext()) { + if (tupleIterator.hasNext()) { Tuple t = tupleIterator.next(); - map.putAll(t.fields); - return new Tuple(map); + return t.clone(); } else { - map.put("EOF", true); - return new Tuple(map); + return Tuple.EOF(); } } diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/HashRollupStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/HashRollupStream.java index 8bf82c63151..0754692e658 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/HashRollupStream.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/HashRollupStream.java @@ -199,18 +199,16 @@ public class HashRollupStream extends TupleStream implements Expressible { if (tuple.EOF) { List tuples = new ArrayList(); for(Map.Entry entry : metricMap.entrySet()) { - Map map = new HashMap(); + Tuple t = new Tuple(); Metric[] finishedMetrics = entry.getValue(); for (Metric metric : finishedMetrics) { - map.put(metric.getIdentifier(), metric.getValue()); + t.put(metric.getIdentifier(), metric.getValue()); } HashKey hashKey = entry.getKey(); for (int i = 0; i < buckets.length; i++) { - map.put(buckets[i].toString(), hashKey.getParts()[i]); + t.put(buckets[i].toString(), hashKey.getParts()[i]); } - - Tuple t = new Tuple(map); tuples.add(t); } tuples.add(tuple); diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/JDBCStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/JDBCStream.java index ea4071501f4..778f60cfc1a 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/JDBCStream.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/JDBCStream.java @@ -31,11 +31,9 @@ import java.sql.Timestamp; import java.sql.Types; import java.time.format.DateTimeFormatter; import java.util.ArrayList; -import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Locale; -import java.util.Map; import java.util.Properties; import org.apache.solr.client.solrj.io.Tuple; @@ -50,6 +48,7 @@ import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParamete import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParameter; import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionValue; import org.apache.solr.client.solrj.io.stream.expr.StreamFactory; +import org.apache.solr.common.params.StreamParams; import static org.apache.solr.common.params.CommonParams.SORT; @@ -515,22 +514,20 @@ public class JDBCStream extends TupleStream implements Expressible { public Tuple read() throws IOException { - try{ - Map fields = new HashMap<>(); - if(resultSet.next()){ + try { + Tuple tuple = new Tuple(); + if (resultSet.next()) { // we have a record - for(ResultSetValueSelector selector : valueSelectors){ - fields.put(selector.getColumnName(), selector.selectValue(resultSet)); + for (ResultSetValueSelector selector : valueSelectors) { + tuple.put(selector.getColumnName(), selector.selectValue(resultSet)); } - } - else{ + } else { // we do not have a record - fields.put("EOF", true); + tuple.put(StreamParams.EOF, true); } - return new Tuple(fields); - } - catch(SQLException e){ + return tuple; + } catch (SQLException e) { throw new IOException(String.format(Locale.ROOT, "Failed to read next record with error '%s'", e.getMessage()), e); } } diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/KnnStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/KnnStream.java index c03db3820d3..39c9cd60461 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/KnnStream.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/KnnStream.java @@ -225,17 +225,14 @@ public class KnnStream extends TupleStream implements Expressible { public Tuple read() throws IOException { if(documentIterator.hasNext()) { - Map map = new HashMap(); + Tuple tuple = new Tuple(); SolrDocument doc = documentIterator.next(); for(Entry entry : doc.entrySet()) { - map.put(entry.getKey(), entry.getValue()); + tuple.put(entry.getKey(), entry.getValue()); } - return new Tuple(map); - } else { - Map fields = new HashMap(); - fields.put("EOF", true); - Tuple tuple = new Tuple(fields); return tuple; + } else { + return Tuple.EOF(); } } diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ListStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ListStream.java index 33f8fd59bbe..3858df5521c 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ListStream.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ListStream.java @@ -18,7 +18,6 @@ package org.apache.solr.client.solrj.io.stream; import java.io.IOException; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; import org.apache.solr.client.solrj.io.Tuple; @@ -114,9 +113,7 @@ public class ListStream extends TupleStream implements Expressible { streams[streamIndex] = null; currentStream.open(); } else { - HashMap map = new HashMap(); - map.put("EOF", true); - return new Tuple(map); + return Tuple.EOF(); } } diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ModelStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ModelStream.java index ffaf313185a..f13e736a6b8 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ModelStream.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ModelStream.java @@ -193,9 +193,7 @@ public class ModelStream extends TupleStream implements Expressible { tuple = model; model = null; } else { - Map map = new HashMap(); - map.put("EOF", true); - tuple = new Tuple(map); + tuple = Tuple.EOF(); } return tuple; diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/NoOpStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/NoOpStream.java index 8d55c313c05..85a0f554427 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/NoOpStream.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/NoOpStream.java @@ -18,7 +18,6 @@ package org.apache.solr.client.solrj.io.stream; import java.io.IOException; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; import org.apache.solr.client.solrj.io.Tuple; @@ -88,10 +87,7 @@ public class NoOpStream extends TupleStream implements Expressible { } public Tuple read() throws IOException { - HashMap m = new HashMap(); - m.put("EOF", true); - Tuple tuple = new Tuple(m); - return tuple; + return Tuple.EOF(); } /** Return the stream sort - ie, the order in which records are returned */ diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/NullStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/NullStream.java index 4f0181b2383..067acb561b9 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/NullStream.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/NullStream.java @@ -18,7 +18,6 @@ package org.apache.solr.client.solrj.io.stream; import java.io.IOException; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; import java.util.Date; @@ -133,7 +132,7 @@ public class NullStream extends TupleStream implements Expressible { if(tuple.EOF) { eof = tuple; long end = new Date().getTime(); - Tuple t = new Tuple(new HashMap()); + Tuple t = new Tuple(); t.put("nullCount", count); t.put("timer", end-start); return t; diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ParallelListStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ParallelListStream.java index aeadd90321f..dbf2901d963 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ParallelListStream.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ParallelListStream.java @@ -19,7 +19,6 @@ package org.apache.solr.client.solrj.io.stream; import java.io.IOException; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; @@ -107,14 +106,12 @@ public class ParallelListStream extends TupleStream implements Expressible { } public Tuple read() throws IOException { - while(true) { + while (true) { if (currentStream == null) { if (streamIndex < streams.length) { currentStream = streams[streamIndex]; } else { - HashMap map = new HashMap(); - map.put("EOF", true); - return new Tuple(map); + return Tuple.EOF(); } } diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ParallelStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ParallelStream.java index d81aa5414f9..141411eb8e9 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ParallelStream.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ParallelStream.java @@ -18,10 +18,8 @@ package org.apache.solr.client.solrj.io.stream; import java.io.IOException; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; import java.util.Locale; -import java.util.Map; import org.apache.solr.client.solrj.io.Tuple; import org.apache.solr.client.solrj.io.comp.FieldComparator; @@ -217,10 +215,6 @@ public class ParallelStream extends CloudSolrStream implements Expressible { Tuple tuple = _read(); if(tuple.EOF) { - Map m = new HashMap(); - m.put("EOF", true); - Tuple t = new Tuple(m); - /* Map metrics = new HashMap(); Iterator> it = this.eofTuples.entrySet().iterator(); @@ -235,7 +229,7 @@ public class ParallelStream extends CloudSolrStream implements Expressible { t.setMetrics(metrics); } */ - return t; + return Tuple.EOF(); } return tuple; diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/PlotStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/PlotStream.java index a83349ae5fb..142e3d03910 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/PlotStream.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/PlotStream.java @@ -150,10 +150,8 @@ public class PlotStream extends TupleStream implements Expressible { public Tuple read() throws IOException { - if(finished) { - Map m = new HashMap<>(); - m.put("EOF", true); - return new Tuple(m); + if (finished) { + return Tuple.EOF(); } else { finished = true; Map values = new HashMap<>(); @@ -197,8 +195,8 @@ public class PlotStream extends TupleStream implements Expressible { values.put("data", xy); Tuple tup = new Tuple(values); - tup.fieldLabels = fieldLabels; - tup.fieldNames = fieldNames; + tup.setFieldLabels(fieldLabels); + tup.setFieldNames(fieldNames); return tup; } } diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/RandomStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/RandomStream.java index aca0e3d4af8..20a055c3182 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/RandomStream.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/RandomStream.java @@ -234,25 +234,22 @@ public class RandomStream extends TupleStream implements Expressible { public Tuple read() throws IOException { if(documentIterator.hasNext()) { - Map map = new HashMap(); + Tuple tuple = new Tuple(); SolrDocument doc = documentIterator.next(); // Put the generated x-axis first. If there really is an x field it will overwrite it. if(outputX) { - map.put("x", x++); + tuple.put("x", x++); } for(Entry entry : doc.entrySet()) { - map.put(entry.getKey(), entry.getValue()); + tuple.put(entry.getKey(), entry.getValue()); } - return new Tuple(map); - } else { - Map fields = new HashMap(); - fields.put("EOF", true); - Tuple tuple = new Tuple(fields); return tuple; + } else { + return Tuple.EOF(); } } diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/RollupStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/RollupStream.java index c1b6894f3f3..cdd864178e4 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/RollupStream.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/RollupStream.java @@ -18,10 +18,8 @@ package org.apache.solr.client.solrj.io.stream; import java.io.IOException; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; import java.util.Locale; -import java.util.Map; import org.apache.solr.client.solrj.io.Tuple; import org.apache.solr.client.solrj.io.comp.HashKey; @@ -206,15 +204,14 @@ public class RollupStream extends TupleStream implements Expressible { return tuple; } - Map map = new HashMap(); + Tuple t = new Tuple(); for(Metric metric : currentMetrics) { - map.put(metric.getIdentifier(), metric.getValue()); + t.put(metric.getIdentifier(), metric.getValue()); } for(int i=0; i map = new HashMap(); + t = new Tuple(); for(Metric metric : currentMetrics) { - map.put(metric.getIdentifier(), metric.getValue()); + t.put(metric.getIdentifier(), metric.getValue()); } for(int i=0; i entry : doc.entrySet()) { - map.put(entry.getKey(), entry.getValue()); + tuple.put(entry.getKey(), entry.getValue()); } - return new Tuple(map); - } else { - Map fields = new HashMap(); - fields.put("EOF", true); - Tuple tuple = new Tuple(fields); return tuple; + } else { + return Tuple.EOF(); } } diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SelectStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SelectStream.java index c538560da42..aee9c4cfd14 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SelectStream.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SelectStream.java @@ -257,8 +257,8 @@ public class SelectStream extends TupleStream implements Expressible { } // create a copy with the limited set of fields - Tuple workingToReturn = new Tuple(new HashMap<>()); - Tuple workingForEvaluators = new Tuple(new HashMap<>()); + Tuple workingToReturn = new Tuple(); + Tuple workingForEvaluators = new Tuple(); //Clear the TupleContext before running the evaluators. //The TupleContext allows evaluators to cache values within the scope of a single tuple. @@ -267,7 +267,7 @@ public class SelectStream extends TupleStream implements Expressible { streamContext.getTupleContext().clear(); - for(Object fieldName : original.fields.keySet()){ + for(Object fieldName : original.getFields().keySet()){ workingForEvaluators.put(fieldName, original.get(fieldName)); if(selectedFields.containsKey(fieldName)){ workingToReturn.put(selectedFields.get(fieldName), original.get(fieldName)); diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SignificantTermsStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SignificantTermsStream.java index c3a120f7c74..ac100b3e591 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SignificantTermsStream.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SignificantTermsStream.java @@ -335,9 +335,7 @@ public class SignificantTermsStream extends TupleStream implements Expressible{ tuples.add(new Tuple(map)); } - Map map = new HashMap(); - map.put("EOF", true); - tuples.add(new Tuple(map)); + tuples.add(Tuple.EOF()); tupleIterator = tuples.iterator(); } diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SolrStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SolrStream.java index fad08d2a40b..f3542ea76e8 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SolrStream.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SolrStream.java @@ -21,7 +21,6 @@ import java.io.InputStream; import java.io.InputStreamReader; import java.nio.charset.StandardCharsets; import java.util.ArrayList; -import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -43,6 +42,7 @@ import org.apache.solr.client.solrj.request.QueryRequest; import org.apache.solr.common.params.CommonParams; import org.apache.solr.common.params.ModifiableSolrParams; import org.apache.solr.common.params.SolrParams; +import org.apache.solr.common.params.StreamParams; import org.apache.solr.common.util.NamedList; /** @@ -202,12 +202,10 @@ public class SolrStream extends TupleStream { if (fields == null) { //Return the EOF tuple. - Map m = new HashMap(); - m.put("EOF", true); - return new Tuple(m); + return Tuple.EOF(); } else { - String msg = (String) fields.get("EXCEPTION"); + String msg = (String) fields.get(StreamParams.EXCEPTION); if (msg != null) { HandledException ioException = new HandledException(msg); throw ioException; diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/StatsStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/StatsStream.java index 0fd724637ea..c05fc3ec201 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/StatsStream.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/StatsStream.java @@ -20,7 +20,6 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; -import java.util.HashMap; import java.util.List; import java.util.Locale; import java.util.Map; @@ -270,10 +269,7 @@ public class StatsStream extends TupleStream implements Expressible { ++index; return tuple; } else { - Map fields = new HashMap(); - fields.put("EOF", true); - Tuple tuple = new Tuple(fields); - return tuple; + return Tuple.EOF(); } } @@ -308,7 +304,7 @@ public class StatsStream extends TupleStream implements Expressible { private void getTuples(NamedList response, Metric[] metrics) { - this.tuple = new Tuple(new HashMap()); + this.tuple = new Tuple(); NamedList facets = (NamedList)response.get("facets"); fillTuple(tuple, facets, metrics); } diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TextLogitStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TextLogitStream.java index 819d3ae447f..3e6666d8bd1 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TextLogitStream.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TextLogitStream.java @@ -442,9 +442,7 @@ public class TextLogitStream extends TupleStream implements Expressible { try { if(++iteration > maxIterations) { - Map map = new HashMap(); - map.put("EOF", true); - return new Tuple(map); + return Tuple.EOF(); } else { if (this.idfs == null) { @@ -560,14 +558,13 @@ public class TextLogitStream extends TupleStream implements Expressible { @Override public Tuple read() throws IOException { - HashMap map = new HashMap(); - if(it.hasNext()) { - map.put("term_s",it.next()); - map.put("score_f",1.0); - return new Tuple(map); + if (it.hasNext()) { + Tuple tuple = new Tuple(); + tuple.put("term_s", it.next()); + tuple.put("score_f", 1.0); + return tuple; } else { - map.put("EOF", true); - return new Tuple(map); + return Tuple.EOF(); } } @@ -650,13 +647,13 @@ public class TextLogitStream extends TupleStream implements Expressible { List shardWeights = (List)logit.get("weights"); double shardError = (double)logit.get("error"); - Map map = new HashMap(); + Tuple tuple = new Tuple(); - map.put("error", shardError); - map.put("weights", shardWeights); - map.put("evaluation", logit.get("evaluation")); + tuple.put("error", shardError); + tuple.put("weights", shardWeights); + tuple.put("evaluation", logit.get("evaluation")); - return new Tuple(map); + return tuple; } } } diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TimeSeriesStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TimeSeriesStream.java index ee4570d95d7..e9d7e7aa3c6 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TimeSeriesStream.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TimeSeriesStream.java @@ -22,10 +22,8 @@ import java.time.ZoneOffset; import java.time.format.DateTimeFormatter; import java.util.ArrayList; import java.util.Arrays; -import java.util.HashMap; import java.util.List; import java.util.Locale; -import java.util.Map; import java.util.Optional; import java.util.Map.Entry; import java.util.stream.Collectors; @@ -328,10 +326,7 @@ public class TimeSeriesStream extends TupleStream implements Expressible { ++index; return tuple; } else { - Map fields = new HashMap(); - fields.put("EOF", true); - Tuple tuple = new Tuple(fields); - return tuple; + return Tuple.EOF(); } } @@ -383,7 +378,7 @@ public class TimeSeriesStream extends TupleStream implements Expressible { String field, Metric[] metrics) { - Tuple tuple = new Tuple(new HashMap()); + Tuple tuple = new Tuple(); NamedList facets = (NamedList)response.get("facets"); fillTuples(tuples, tuple, facets, field, metrics); } diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TupStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TupStream.java index a7bca77e86c..7dd6204bc70 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TupStream.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TupStream.java @@ -152,9 +152,7 @@ public class TupStream extends TupleStream implements Expressible { if(unnestedTuples == null) { if (finished) { - Map m = new HashMap<>(); - m.put("EOF", true); - return new Tuple(m); + return Tuple.EOF(); } else { finished = true; if(unnestedTuple != null) { @@ -167,9 +165,7 @@ public class TupStream extends TupleStream implements Expressible { if(unnestedTuples.hasNext()) { return unnestedTuples.next(); } else { - Map m = new HashMap<>(); - m.put("EOF", true); - return new Tuple(m); + return Tuple.EOF(); } } } @@ -234,8 +230,8 @@ public class TupStream extends TupleStream implements Expressible { } } this.tup = new Tuple(values); - tup.fieldNames = fieldNames; - tup.fieldLabels = fieldLabels; + tup.setFieldNames(fieldNames); + tup.setFieldLabels(fieldLabels); // nothing to do here } diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/UpdateStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/UpdateStream.java index 5313f147703..453f842220c 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/UpdateStream.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/UpdateStream.java @@ -19,10 +19,8 @@ package org.apache.solr.client.solrj.io.stream; import java.io.IOException; import java.lang.invoke.MethodHandles; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; import java.util.Locale; -import java.util.Map; import java.util.Optional; import org.apache.solr.client.solrj.SolrServerException; @@ -298,7 +296,7 @@ public class UpdateStream extends TupleStream implements Expressible { private SolrInputDocument convertTupleToSolrDocument(Tuple tuple) { SolrInputDocument doc = new SolrInputDocument(); - for (Object field : tuple.fields.keySet()) { + for (Object field : tuple.getFields().keySet()) { if (! (field.equals(CommonParams.VERSION_FIELD) && pruneVersionField)) { Object value = tuple.get(field); @@ -347,16 +345,16 @@ public class UpdateStream extends TupleStream implements Expressible { private Tuple createBatchSummaryTuple(int batchSize) { assert batchSize > 0; - Map m = new HashMap(); + Tuple tuple = new Tuple(); this.totalDocsIndex += batchSize; ++batchNumber; - m.put(BATCH_INDEXED_FIELD_NAME, batchSize); - m.put("totalIndexed", this.totalDocsIndex); - m.put("batchNumber", batchNumber); - if(coreName != null) { - m.put("worker", coreName); + tuple.put(BATCH_INDEXED_FIELD_NAME, batchSize); + tuple.put("totalIndexed", this.totalDocsIndex); + tuple.put("batchNumber", batchNumber); + if (coreName != null) { + tuple.put("worker", coreName); } - return new Tuple(m); + return tuple; } } diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ZplotStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ZplotStream.java index a85c33efea3..ea6606c44e9 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ZplotStream.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ZplotStream.java @@ -111,10 +111,7 @@ public class ZplotStream extends TupleStream implements Expressible { if(out.hasNext()) { return out.next(); } else { - Map m = new HashMap(); - m.put("EOF", true); - Tuple t = new Tuple(m); - return t; + return Tuple.EOF(); } } @@ -198,7 +195,7 @@ public class ZplotStream extends TupleStream implements Expressible { if(!table && !distribution && !clusters && !heat) { //Handle the vectors for (int i = 0; i < numTuples; i++) { - Tuple tuple = new Tuple(new HashMap()); + Tuple tuple = new Tuple(); for (Map.Entry entry : evaluated.entrySet()) { List l = (List) entry.getValue(); tuple.put(entry.getKey(), l.get(i)); @@ -208,7 +205,7 @@ public class ZplotStream extends TupleStream implements Expressible { } //Generate the x axis if the tuples contain y and not x - if (outTuples.get(0).fields.containsKey("y") && !outTuples.get(0).fields.containsKey("x")) { + if (outTuples.get(0).getFields().containsKey("y") && !outTuples.get(0).getFields().containsKey("x")) { int x = 0; for (Tuple tuple : outTuples) { tuple.put("x", x++); @@ -224,7 +221,7 @@ public class ZplotStream extends TupleStream implements Expressible { clusterNum++; List points = c.getPoints(); for (KmeansEvaluator.ClusterPoint p : points) { - Tuple tuple = new Tuple(new HashMap()); + Tuple tuple = new Tuple(); tuple.put("x", p.getPoint()[0]); tuple.put("y", p.getPoint()[1]); tuple.put("cluster", "cluster" + clusterNum); @@ -239,7 +236,7 @@ public class ZplotStream extends TupleStream implements Expressible { clusterNum++; List points = c.getPoints(); for (DbscanEvaluator.ClusterPoint p : points) { - Tuple tuple = new Tuple(new HashMap()); + Tuple tuple = new Tuple(); tuple.put("x", p.getPoint()[0]); tuple.put("y", p.getPoint()[1]); tuple.put("cluster", "cluster" + clusterNum); @@ -269,7 +266,7 @@ public class ZplotStream extends TupleStream implements Expressible { } for (int i = 0; i < x.length; i++) { - Tuple tuple = new Tuple(new HashMap()); + Tuple tuple = new Tuple(); if(!Double.isNaN(x[i])) { tuple.put("x", Precision.round(x[i], 2)); if(y[i] == Double.NEGATIVE_INFINITY || y[i] == Double.POSITIVE_INFINITY) { @@ -302,7 +299,7 @@ public class ZplotStream extends TupleStream implements Expressible { } for (int i = 0; i < x.length; i++) { - Tuple tuple = new Tuple(new HashMap()); + Tuple tuple = new Tuple(); tuple.put("x", x[i]); tuple.put("y", y[i]); outTuples.add(tuple); @@ -312,16 +309,16 @@ public class ZplotStream extends TupleStream implements Expressible { if(list.get(0) instanceof Tuple) { List tlist = (List)o; Tuple tuple = tlist.get(0); - if(tuple.fields.containsKey("N")) { + if(tuple.getFields().containsKey("N")) { for(Tuple t : tlist) { - Tuple outtuple = new Tuple(new HashMap()); + Tuple outtuple = new Tuple(); outtuple.put("x", Precision.round(((double)t.get("mean")), 2)); outtuple.put("y", t.get("prob")); outTuples.add(outtuple); } - } else if(tuple.fields.containsKey("count")) { + } else if(tuple.getFields().containsKey("count")) { for(Tuple t : tlist) { - Tuple outtuple = new Tuple(new HashMap()); + Tuple outtuple = new Tuple(); outtuple.put("x", t.get("value")); outtuple.put("y", t.get("pct")); outTuples.add(outtuple); @@ -344,7 +341,7 @@ public class ZplotStream extends TupleStream implements Expressible { } else { rowLabel = Integer.toString(i); } - Tuple tuple = new Tuple(new HashMap()); + Tuple tuple = new Tuple(); tuple.put("rowLabel", rowLabel); double[] row = data[i]; for (int j = 0; j < row.length; j++) { @@ -378,7 +375,7 @@ public class ZplotStream extends TupleStream implements Expressible { double[] row = data[i]; for (int j = 0; j < row.length; j++) { - Tuple tuple = new Tuple(new HashMap()); + Tuple tuple = new Tuple(); tuple.put("y", rowLabel); String colLabel = null; if (colLabels != null) { diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/expr/StreamFactory.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/expr/StreamFactory.java index 4e176dd212c..451add8a2ab 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/expr/StreamFactory.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/expr/StreamFactory.java @@ -45,17 +45,23 @@ import org.apache.solr.client.solrj.io.stream.metrics.Metric; */ public class StreamFactory implements Serializable { - private transient HashMap collectionZkHosts; - private transient HashMap>> functionNames; + private transient HashMap collectionZkHosts; + private transient HashMap>> functionNames; private transient String defaultZkHost; private transient String defaultCollection; + private transient String defaultSort; public StreamFactory(){ collectionZkHosts = new HashMap<>(); functionNames = new HashMap<>(); } + + public StreamFactory(HashMap>> functionNames) { + this.functionNames = functionNames; + collectionZkHosts = new HashMap<>(); + } - public StreamFactory withCollectionZkHost(String collectionName, String zkHost){ + public StreamFactory withCollectionZkHost(String collectionName, String zkHost) { this.collectionZkHosts.put(collectionName, zkHost); this.defaultCollection = collectionName; return this; @@ -70,12 +76,27 @@ public class StreamFactory implements Serializable { return this; } + public Object clone() { + //Shallow copy + StreamFactory clone = new StreamFactory(functionNames); + return clone.withCollectionZkHost(defaultCollection, defaultZkHost).withDefaultSort(defaultSort); + } + + public StreamFactory withDefaultSort(String sort) { + this.defaultSort = sort; + return this; + } + + public String getDefaultSort() { + return this.defaultSort; + } + public String getDefaultZkHost() { return this.defaultZkHost; } - public String getCollectionZkHost(String collectionName){ - if(this.collectionZkHosts.containsKey(collectionName)){ + public String getCollectionZkHost(String collectionName) { + if (this.collectionZkHosts.containsKey(collectionName)) { return this.collectionZkHosts.get(collectionName); } return null; @@ -84,104 +105,104 @@ public class StreamFactory implements Serializable { public Map>> getFunctionNames() { return Collections.unmodifiableMap(functionNames); } - public StreamFactory withFunctionName(String functionName, Class clazz){ + + public StreamFactory withFunctionName(String functionName, Class clazz) { this.functionNames.put(functionName, () -> clazz); return this; } - public StreamFactory withFunctionName(String functionName, Supplier< Class> clazz){ + public StreamFactory withFunctionName(String functionName, Supplier< Class> clazz) { this.functionNames.put(functionName, clazz); return this; } + public StreamFactory withoutFunctionName(String functionName) { + this.functionNames.remove(functionName); + return this; + } - public StreamExpressionParameter getOperand(StreamExpression expression, int parameterIndex){ - if(null == expression.getParameters() || parameterIndex >= expression.getParameters().size()){ + public StreamExpressionParameter getOperand(StreamExpression expression, int parameterIndex) { + if (null == expression.getParameters() || parameterIndex >= expression.getParameters().size()) { return null; } - return expression.getParameters().get(parameterIndex); } - public List getValueOperands(StreamExpression expression){ + public List getValueOperands(StreamExpression expression) { return getOperandsOfType(expression, StreamExpressionValue.class).stream().map(item -> ((StreamExpressionValue) item).getValue()).collect(Collectors.toList()); } /** Given an expression, will return the value parameter at the given index, or null if doesn't exist */ - public String getValueOperand(StreamExpression expression, int parameterIndex){ + public String getValueOperand(StreamExpression expression, int parameterIndex) { StreamExpressionParameter parameter = getOperand(expression, parameterIndex); - if(null != parameter){ - if(parameter instanceof StreamExpressionValue){ + if (null != parameter) { + if (parameter instanceof StreamExpressionValue) { return ((StreamExpressionValue)parameter).getValue(); - } else if(parameter instanceof StreamExpression) { + } else if (parameter instanceof StreamExpression) { return parameter.toString(); } } - return null; } - public List getNamedOperands(StreamExpression expression){ + public List getNamedOperands(StreamExpression expression) { List namedParameters = new ArrayList<>(); - for(StreamExpressionParameter parameter : getOperandsOfType(expression, StreamExpressionNamedParameter.class)){ - namedParameters.add((StreamExpressionNamedParameter)parameter); + for (StreamExpressionParameter parameter : getOperandsOfType(expression, StreamExpressionNamedParameter.class)) { + namedParameters.add((StreamExpressionNamedParameter) parameter); } - return namedParameters; } - public StreamExpressionNamedParameter getNamedOperand(StreamExpression expression, String name){ + + public StreamExpressionNamedParameter getNamedOperand(StreamExpression expression, String name) { List namedParameters = getNamedOperands(expression); - for(StreamExpressionNamedParameter param : namedParameters){ - if(param.getName().equals(name)){ + for (StreamExpressionNamedParameter param : namedParameters) { + if (param.getName().equals(name)) { return param; } } - return null; } - public List getExpressionOperands(StreamExpression expression){ + public List getExpressionOperands(StreamExpression expression) { List namedParameters = new ArrayList<>(); - for(StreamExpressionParameter parameter : getOperandsOfType(expression, StreamExpression.class)){ - namedParameters.add((StreamExpression)parameter); + for (StreamExpressionParameter parameter : getOperandsOfType(expression, StreamExpression.class)) { + namedParameters.add((StreamExpression) parameter); } - return namedParameters; } - public List getExpressionOperands(StreamExpression expression, String functionName){ + + public List getExpressionOperands(StreamExpression expression, String functionName) { List namedParameters = new ArrayList<>(); - for(StreamExpressionParameter parameter : getOperandsOfType(expression, StreamExpression.class)){ - StreamExpression expressionOperand = (StreamExpression)parameter; - if(expressionOperand.getFunctionName().equals(functionName)){ + for (StreamExpressionParameter parameter : getOperandsOfType(expression, StreamExpression.class)) { + StreamExpression expressionOperand = (StreamExpression) parameter; + if (expressionOperand.getFunctionName().equals(functionName)) { namedParameters.add(expressionOperand); } } - return namedParameters; } - public List getOperandsOfType(StreamExpression expression, Class ... clazzes){ + + public List getOperandsOfType(StreamExpression expression, Class ... clazzes) { List parameters = new ArrayList<>(); parameterLoop: - for(StreamExpressionParameter parameter : expression.getParameters()){ - for(Class clazz : clazzes){ - if(!clazz.isAssignableFrom(parameter.getClass())){ + for (StreamExpressionParameter parameter : expression.getParameters()) { + for (Class clazz : clazzes) { + if (!clazz.isAssignableFrom(parameter.getClass())) { continue parameterLoop; // go to the next parameter since this parameter cannot be assigned to at least one of the classes } } - parameters.add(parameter); } - return parameters; } - public List getExpressionOperandsRepresentingTypes(StreamExpression expression, Class ... clazzes){ + public List getExpressionOperandsRepresentingTypes(StreamExpression expression, Class ... clazzes) { List matchingStreamExpressions = new ArrayList<>(); List allStreamExpressions = getExpressionOperands(expression); parameterLoop: - for(StreamExpression streamExpression : allStreamExpressions) { + for (StreamExpression streamExpression : allStreamExpressions) { Supplier> classSupplier = functionNames.get(streamExpression.getFunctionName()); if (classSupplier != null) { for (Class clazz : clazzes) { @@ -189,180 +210,170 @@ public class StreamFactory implements Serializable { continue parameterLoop; } } - matchingStreamExpressions.add(streamExpression); } } - return matchingStreamExpressions; } - public boolean doesRepresentTypes(StreamExpression expression, Class ... clazzes){ + public boolean doesRepresentTypes(StreamExpression expression, Class ... clazzes) { Supplier> classSupplier = functionNames.get(expression.getFunctionName()); - if(classSupplier != null){ - for(Class clazz : clazzes){ - if(!clazz.isAssignableFrom(classSupplier.get())){ + if (classSupplier != null) { + for (Class clazz : clazzes) { + if (!clazz.isAssignableFrom(classSupplier.get())) { return false; } } return true; } - return false; } - public int getIntOperand(StreamExpression expression, String paramName, Integer defaultValue) throws IOException{ + public int getIntOperand(StreamExpression expression, String paramName, Integer defaultValue) throws IOException { StreamExpressionNamedParameter param = getNamedOperand(expression, paramName); - if(null == param || null == param.getParameter() || !(param.getParameter() instanceof StreamExpressionValue)){ - if(null != defaultValue){ + if (null == param || null == param.getParameter() || !(param.getParameter() instanceof StreamExpressionValue)) { + if (null != defaultValue) { return defaultValue; } throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - expecting a single '%s' parameter of type integer but didn't find one",expression, paramName)); } - String nStr = ((StreamExpressionValue)param.getParameter()).getValue(); - try{ + String nStr = ((StreamExpressionValue) param.getParameter()).getValue(); + try { return Integer.parseInt(nStr); - } - catch(NumberFormatException e){ - if(null != defaultValue){ + } catch (NumberFormatException e) { + if (null != defaultValue) { return defaultValue; } - throw new IOException(String.format(Locale.ROOT,"invalid expression %s - %s '%s' is not a valid integer.",expression, paramName, nStr)); + throw new IOException(String.format(Locale.ROOT,"invalid expression %s - %s '%s' is not a valid integer.", expression, paramName, nStr)); } } - public boolean getBooleanOperand(StreamExpression expression, String paramName, Boolean defaultValue) throws IOException{ + public boolean getBooleanOperand(StreamExpression expression, String paramName, Boolean defaultValue) throws IOException { StreamExpressionNamedParameter param = getNamedOperand(expression, paramName); - if(null == param || null == param.getParameter() || !(param.getParameter() instanceof StreamExpressionValue)){ - if(null != defaultValue){ + if (null == param || null == param.getParameter() || !(param.getParameter() instanceof StreamExpressionValue)) { + if (null != defaultValue) { return defaultValue; } - throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - expecting a single '%s' parameter of type boolean but didn't find one",expression, paramName)); + throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - expecting a single '%s' parameter of type boolean but didn't find one", expression, paramName)); } - String nStr = ((StreamExpressionValue)param.getParameter()).getValue(); + String nStr = ((StreamExpressionValue) param.getParameter()).getValue(); return Boolean.parseBoolean(nStr); } - public TupleStream constructStream(String expressionClause) throws IOException { return constructStream(StreamExpressionParser.parse(expressionClause)); } - public TupleStream constructStream(StreamExpression expression) throws IOException{ + public TupleStream constructStream(StreamExpression expression) throws IOException { String function = expression.getFunctionName(); Supplier> classSupplier = functionNames.get(function); - if(classSupplier != null){ + if (classSupplier != null) { Class clazz = classSupplier.get(); - if(Expressible.class.isAssignableFrom(clazz) && TupleStream.class.isAssignableFrom(clazz)){ + if (Expressible.class.isAssignableFrom(clazz) && TupleStream.class.isAssignableFrom(clazz)) { return (TupleStream)createInstance(clazz, new Class[]{ StreamExpression.class, StreamFactory.class }, new Object[]{ expression, this}); } } - throw new IOException(String.format(Locale.ROOT,"Invalid stream expression %s - function '%s' is unknown (not mapped to a valid TupleStream)", expression, expression.getFunctionName())); + throw new IOException(String.format(Locale.ROOT, "Invalid stream expression %s - function '%s' is unknown (not mapped to a valid TupleStream)", expression, expression.getFunctionName())); } public Metric constructMetric(String expressionClause) throws IOException { return constructMetric(StreamExpressionParser.parse(expressionClause)); } - public Metric constructMetric(StreamExpression expression) throws IOException{ + + public Metric constructMetric(StreamExpression expression) throws IOException { String function = expression.getFunctionName(); Supplier> classSupplier = functionNames.get(function); - if(classSupplier != null){ + if (classSupplier != null) { Class clazz = classSupplier.get(); - if(Expressible.class.isAssignableFrom(clazz) && Metric.class.isAssignableFrom(clazz)){ + if (Expressible.class.isAssignableFrom(clazz) && Metric.class.isAssignableFrom(clazz)) { return (Metric)createInstance(clazz, new Class[]{ StreamExpression.class, StreamFactory.class }, new Object[]{ expression, this}); } } - throw new IOException(String.format(Locale.ROOT,"Invalid metric expression %s - function '%s' is unknown (not mapped to a valid Metric)", expression, expression.getFunctionName())); + throw new IOException(String.format(Locale.ROOT, "Invalid metric expression %s - function '%s' is unknown (not mapped to a valid Metric)", expression, expression.getFunctionName())); } public StreamComparator constructComparator(String comparatorString, Class comparatorType) throws IOException { - if(comparatorString.contains(",")){ + if (comparatorString.contains(",")) { String[] parts = comparatorString.split(","); StreamComparator[] comps = new StreamComparator[parts.length]; - for(int idx = 0; idx < parts.length; ++idx){ + for (int idx = 0; idx < parts.length; ++idx) { comps[idx] = constructComparator(parts[idx].trim(), comparatorType); } return new MultipleFieldComparator(comps); - } - else if(comparatorString.contains("=")){ + } else if (comparatorString.contains("=")) { // expected format is "left=right order" String[] parts = comparatorString.split("[ =]"); - if(parts.length < 3){ - throw new IOException(String.format(Locale.ROOT,"Invalid comparator expression %s - expecting 'left=right order'",comparatorString)); + if (parts.length < 3) { + throw new IOException(String.format(Locale.ROOT, "Invalid comparator expression %s - expecting 'left=right order'",comparatorString)); } String leftFieldName = null; String rightFieldName = null; String order = null; - for(String part : parts){ + for (String part : parts) { // skip empty - if(null == part || 0 == part.trim().length()){ continue; } + if (null == part || 0 == part.trim().length()) { continue; } // assign each in order - if(null == leftFieldName){ + if (null == leftFieldName) { leftFieldName = part.trim(); - } - else if(null == rightFieldName){ + } else if (null == rightFieldName) { rightFieldName = part.trim(); - } - else { + } else { order = part.trim(); break; // we're done, stop looping } } - if(null == leftFieldName || null == rightFieldName || null == order){ - throw new IOException(String.format(Locale.ROOT,"Invalid comparator expression %s - expecting 'left=right order'",comparatorString)); + if (null == leftFieldName || null == rightFieldName || null == order) { + throw new IOException(String.format(Locale.ROOT, "Invalid comparator expression %s - expecting 'left=right order'",comparatorString)); } - return (StreamComparator)createInstance(comparatorType, new Class[]{ String.class, String.class, ComparatorOrder.class }, new Object[]{ leftFieldName, rightFieldName, ComparatorOrder.fromString(order) }); - } - else{ + return (StreamComparator) createInstance(comparatorType, new Class[]{ String.class, String.class, ComparatorOrder.class }, new Object[]{ leftFieldName, rightFieldName, ComparatorOrder.fromString(order) }); + } else { // expected format is "field order" String[] parts = comparatorString.split(" "); - if(2 != parts.length){ - throw new IOException(String.format(Locale.ROOT,"Invalid comparator expression %s - expecting 'field order'",comparatorString)); + if (2 != parts.length) { + throw new IOException(String.format(Locale.ROOT, "Invalid comparator expression %s - expecting 'field order'",comparatorString)); } String fieldName = parts[0].trim(); String order = parts[1].trim(); - return (StreamComparator)createInstance(comparatorType, new Class[]{ String.class, ComparatorOrder.class }, new Object[]{ fieldName, ComparatorOrder.fromString(order) }); + return (StreamComparator) createInstance(comparatorType, new Class[]{ String.class, ComparatorOrder.class }, new Object[]{ fieldName, ComparatorOrder.fromString(order) }); } } public StreamEqualitor constructEqualitor(String equalitorString, Class equalitorType) throws IOException { - if(equalitorString.contains(",")){ + if (equalitorString.contains(",")) { String[] parts = equalitorString.split(","); StreamEqualitor[] eqs = new StreamEqualitor[parts.length]; - for(int idx = 0; idx < parts.length; ++idx){ + for (int idx = 0; idx < parts.length; ++idx) { eqs[idx] = constructEqualitor(parts[idx].trim(), equalitorType); } return new MultipleFieldEqualitor(eqs); - } - else{ + } else { String leftFieldName; String rightFieldName; - if(equalitorString.contains("=")){ + if (equalitorString.contains("=")) { String[] parts = equalitorString.split("="); - if(2 != parts.length){ - throw new IOException(String.format(Locale.ROOT,"Invalid equalitor expression %s - expecting fieldName=fieldName",equalitorString)); + if (2 != parts.length) { + throw new IOException(String.format(Locale.ROOT, "Invalid equalitor expression %s - expecting fieldName=fieldName",equalitorString)); } leftFieldName = parts[0].trim(); rightFieldName = parts[1].trim(); - } - else{ + } else { leftFieldName = rightFieldName = equalitorString.trim(); } - return (StreamEqualitor)createInstance(equalitorType, new Class[]{ String.class, String.class }, new Object[]{ leftFieldName, rightFieldName }); + return (StreamEqualitor) createInstance(equalitorType, new Class[]{ String.class, String.class }, new Object[]{ leftFieldName, rightFieldName }); } } @@ -386,18 +397,19 @@ public class StreamFactory implements Serializable { public org.apache.solr.client.solrj.io.eval.StreamEvaluator constructEvaluator(String expressionClause) throws IOException { return constructEvaluator(StreamExpressionParser.parse(expressionClause)); } - public org.apache.solr.client.solrj.io.eval.StreamEvaluator constructEvaluator(StreamExpression expression) throws IOException{ + + public org.apache.solr.client.solrj.io.eval.StreamEvaluator constructEvaluator(StreamExpression expression) throws IOException { String function = expression.getFunctionName(); Supplier> classSupplier = functionNames.get(function); - if(classSupplier != null){ + if (classSupplier != null) { Class clazz = classSupplier.get(); - if(Expressible.class.isAssignableFrom(clazz) && StreamEvaluator.class.isAssignableFrom(clazz)){ + if (Expressible.class.isAssignableFrom(clazz) && StreamEvaluator.class.isAssignableFrom(clazz)) { return (org.apache.solr.client.solrj.io.eval.StreamEvaluator)createInstance(clazz, new Class[]{ StreamExpression.class, StreamFactory.class }, new Object[]{ expression, this}); } } - throw new IOException(String.format(Locale.ROOT,"Invalid evaluator expression %s - function '%s' is unknown (not mapped to a valid StreamEvaluator)", expression, expression.getFunctionName())); + throw new IOException(String.format(Locale.ROOT, "Invalid evaluator expression %s - function '%s' is unknown (not mapped to a valid StreamEvaluator)", expression, expression.getFunctionName())); } public boolean isStream(StreamExpression expression) throws IOException { @@ -452,13 +464,13 @@ public class StreamFactory implements Serializable { throw new IOException(String.format(Locale.ROOT, "Unable to find function name for class '%s'", clazz.getName())); } - public Object constructPrimitiveObject(String original){ + public Object constructPrimitiveObject(String original) { String lower = original.trim().toLowerCase(Locale.ROOT); - if("null".equals(lower)){ return null; } - if("true".equals(lower) || "false".equals(lower)){ return Boolean.parseBoolean(lower); } - try{ return Long.valueOf(original); } catch(Exception ignored){}; - try{ return Double.valueOf(original); } catch(Exception ignored){}; + if ("null".equals(lower)) { return null; } + if ("true".equals(lower) || "false".equals(lower)){ return Boolean.parseBoolean(lower); } + try { return Long.valueOf(original); } catch(Exception ignored) { }; + try { return Double.valueOf(original); } catch(Exception ignored) { }; // is a string return original; diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/metrics/CountMetric.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/metrics/CountMetric.java index 61b83398e30..093b95e9dc8 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/metrics/CountMetric.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/metrics/CountMetric.java @@ -26,6 +26,7 @@ import org.apache.solr.client.solrj.io.stream.expr.StreamFactory; public class CountMetric extends Metric { private String columnName; private long count; + private boolean isAllColumns; public CountMetric() { this("*"); @@ -56,12 +57,13 @@ public class CountMetric extends Metric { private void init(String functionName, String columnName){ this.columnName = columnName; + this.isAllColumns = "*".equals(this.columnName); setFunctionName(functionName); setIdentifier(functionName, "(", columnName, ")"); } private boolean isAllColumns() { - return "*".equals(this.columnName); + return isAllColumns; } public void update(Tuple tuple) { diff --git a/solr/solrj/src/java/org/apache/solr/common/params/StreamParams.java b/solr/solrj/src/java/org/apache/solr/common/params/StreamParams.java new file mode 100644 index 00000000000..417b8495e73 --- /dev/null +++ b/solr/solrj/src/java/org/apache/solr/common/params/StreamParams.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.solr.common.params; + +/** + * Streaming Expressions Parameters and Properties. + */ +public interface StreamParams { + + // parameters + String EXPR = "expr"; + + // stream properties + String TUPLE = "tuple"; + String DOCS = "docs"; + String RETURN_VALUE = "return-value"; + String RESULT_SET = "result-set"; + + // tuple properties + String RESPONSE_TIME = "RESPONSE_TIME"; + String EOF = "EOF"; + String EXCEPTION = "EXCEPTION"; + String METRICS = "_METRICS_"; + + // other common tuple properties + String P_VALUE = "p-value"; +} diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/CloudAuthStreamTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/CloudAuthStreamTest.java index a452465fa40..e5426fd970f 100644 --- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/CloudAuthStreamTest.java +++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/CloudAuthStreamTest.java @@ -522,7 +522,7 @@ public class CloudAuthStreamTest extends SolrCloudTestCase { final List tuples = getTuples(daemonCheck); assertEquals(1, tuples.size()); // our daemon; if (log.isInfoEnabled()) { - log.info("Current daemon status: {}", tuples.get(0).fields); + log.info("Current daemon status: {}", tuples.get(0).getFields()); } assertEquals(daemonId + " should have never had a successful iteration", Long.valueOf(0L), tuples.get(0).getLong("iterations")); @@ -808,7 +808,7 @@ public class CloudAuthStreamTest extends SolrCloudTestCase { log.trace("TupleStream: {}", tupleStream); tupleStream.open(); for (Tuple t = tupleStream.read(); !t.EOF; t = tupleStream.read()) { - log.trace("Tuple: {}", t.fields); + log.trace("Tuple: {}", t.getFields()); tuples.add(t); } } finally { diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/JDBCStreamTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/JDBCStreamTest.java index 2fbe1011e4e..8b74a66b579 100644 --- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/JDBCStreamTest.java +++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/JDBCStreamTest.java @@ -642,7 +642,7 @@ public class JDBCStreamTest extends SolrCloudTestCase { protected boolean assertFields(List tuples, String ... fields) throws Exception{ for(Tuple tuple : tuples){ for(String field : fields){ - if(!tuple.fields.containsKey(field)){ + if(!tuple.getFields().containsKey(field)){ throw new Exception(String.format(Locale.ROOT, "Expected field '%s' not found", field)); } } @@ -653,7 +653,7 @@ public class JDBCStreamTest extends SolrCloudTestCase { protected boolean assertNotFields(List tuples, String ... fields) throws Exception{ for(Tuple tuple : tuples){ for(String field : fields){ - if(tuple.fields.containsKey(field)){ + if(tuple.getFields().containsKey(field)){ throw new Exception(String.format(Locale.ROOT, "Unexpected field '%s' found", field)); } } diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/SelectWithEvaluatorsTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/SelectWithEvaluatorsTest.java index cf86691e5a0..744632b9061 100644 --- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/SelectWithEvaluatorsTest.java +++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/SelectWithEvaluatorsTest.java @@ -167,7 +167,7 @@ public class SelectWithEvaluatorsTest extends SolrCloudTestCase { protected boolean assertFields(List tuples, String ... fields) throws Exception{ for(Tuple tuple : tuples){ for(String field : fields){ - if(!tuple.fields.containsKey(field)){ + if(!tuple.getFields().containsKey(field)){ throw new Exception(String.format(Locale.ROOT, "Expected field '%s' not found", field)); } } @@ -177,7 +177,7 @@ public class SelectWithEvaluatorsTest extends SolrCloudTestCase { protected boolean assertNotFields(List tuples, String ... fields) throws Exception{ for(Tuple tuple : tuples){ for(String field : fields){ - if(tuple.fields.containsKey(field)){ + if(tuple.getFields().containsKey(field)){ throw new Exception(String.format(Locale.ROOT, "Unexpected field '%s' found", field)); } } diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamDecoratorTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamDecoratorTest.java index 73f6f9d3235..f57f655a45f 100644 --- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamDecoratorTest.java +++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamDecoratorTest.java @@ -2296,7 +2296,7 @@ public class StreamDecoratorTest extends SolrCloudTestCase { tuples = getTuples(stream); assertEquals(1, tuples.size()); - assertFalse(tuples.get(0).fields.containsKey("extra_s")); + assertFalse(tuples.get(0).getFields().containsKey("extra_s")); } finally { solrClientCache.close(); @@ -4467,7 +4467,7 @@ public class StreamDecoratorTest extends SolrCloudTestCase { protected boolean assertFields(List tuples, String ... fields) throws Exception{ for(Tuple tuple : tuples){ for(String field : fields){ - if(!tuple.fields.containsKey(field)){ + if(!tuple.getFields().containsKey(field)){ throw new Exception(String.format(Locale.ROOT, "Expected field '%s' not found", field)); } } @@ -4477,7 +4477,7 @@ public class StreamDecoratorTest extends SolrCloudTestCase { protected boolean assertNotFields(List tuples, String ... fields) throws Exception{ for(Tuple tuple : tuples){ for(String field : fields){ - if(tuple.fields.containsKey(field)){ + if(tuple.getFields().containsKey(field)){ throw new Exception(String.format(Locale.ROOT, "Unexpected field '%s' found", field)); } } diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java index 7e5da9d5b54..7288e7aacf6 100644 --- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java +++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java @@ -674,7 +674,7 @@ public class StreamExpressionTest extends SolrCloudTestCase { solrStream = new SolrStream(jetty.getBaseUrl().toString() + "/collection1", sParams); tuples4 = getTuples(solrStream); assert(tuples4.size() == 500); - Map fields = tuples4.get(0).fields; + Map fields = tuples4.get(0).getFields(); assert(fields.containsKey("id")); assert(fields.containsKey("a_f")); assert(fields.containsKey("a_i")); @@ -3028,7 +3028,7 @@ public class StreamExpressionTest extends SolrCloudTestCase { tuples = getTuples(stream); assertEquals(100, tuples.size()); Tuple lastModel = tuples.get(0); - ClassificationEvaluation evaluation = ClassificationEvaluation.create(lastModel.fields); + ClassificationEvaluation evaluation = ClassificationEvaluation.create(lastModel.getFields()); assertTrue(evaluation.getF1() >= 1.0); assertEquals(Math.log(5000.0 / (2500 + 1)), lastModel.getDoubles("idfs_ds").get(0), 0.0001); // make sure the tuples is retrieved in correct order