mirror of https://github.com/apache/lucene.git
SOLR-10623: Add sql Streaming Expression
This commit is contained in:
parent
5a50887a4b
commit
f326cfb175
|
@ -167,6 +167,7 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware,
|
||||||
.withFunctionName("get", GetStream.class)
|
.withFunctionName("get", GetStream.class)
|
||||||
.withFunctionName("timeseries", TimeSeriesStream.class)
|
.withFunctionName("timeseries", TimeSeriesStream.class)
|
||||||
.withFunctionName("tuple", TupStream.class)
|
.withFunctionName("tuple", TupStream.class)
|
||||||
|
.withFunctionName("sql", SqlStream.class)
|
||||||
.withFunctionName("col", ColumnEvaluator.class)
|
.withFunctionName("col", ColumnEvaluator.class)
|
||||||
.withFunctionName("predict", PredictEvaluator.class)
|
.withFunctionName("predict", PredictEvaluator.class)
|
||||||
.withFunctionName("regress", RegressionEvaluator.class)
|
.withFunctionName("regress", RegressionEvaluator.class)
|
||||||
|
|
|
@ -0,0 +1,280 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.solr.client.solrj.io.stream;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Locale;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Map.Entry;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
|
import org.apache.solr.client.solrj.impl.CloudSolrClient;
|
||||||
|
import org.apache.solr.client.solrj.io.Tuple;
|
||||||
|
import org.apache.solr.client.solrj.io.comp.StreamComparator;
|
||||||
|
import org.apache.solr.client.solrj.io.stream.expr.Explanation;
|
||||||
|
import org.apache.solr.client.solrj.io.stream.expr.Explanation.ExpressionType;
|
||||||
|
import org.apache.solr.client.solrj.io.stream.expr.Expressible;
|
||||||
|
import org.apache.solr.client.solrj.io.stream.expr.StreamExplanation;
|
||||||
|
import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
|
||||||
|
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParameter;
|
||||||
|
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionValue;
|
||||||
|
import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
|
||||||
|
import org.apache.solr.common.cloud.Aliases;
|
||||||
|
import org.apache.solr.common.cloud.ClusterState;
|
||||||
|
import org.apache.solr.common.cloud.DocCollection;
|
||||||
|
import org.apache.solr.common.cloud.Slice;
|
||||||
|
import org.apache.solr.common.cloud.ZkStateReader;
|
||||||
|
import org.apache.solr.common.params.CommonParams;
|
||||||
|
import org.apache.solr.common.params.ModifiableSolrParams;
|
||||||
|
import org.apache.solr.common.params.SolrParams;
|
||||||
|
import org.apache.solr.common.util.StrUtils;
|
||||||
|
|
||||||
|
public class SqlStream extends TupleStream implements Expressible {
|
||||||
|
|
||||||
|
private static final long serialVersionUID = 1;
|
||||||
|
|
||||||
|
protected String zkHost;
|
||||||
|
protected String collection;
|
||||||
|
protected SolrParams params;
|
||||||
|
protected transient CloudSolrClient cloudSolrClient;
|
||||||
|
protected transient TupleStream tupleStream;
|
||||||
|
protected transient StreamContext streamContext;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param zkHost Zookeeper ensemble connection string
|
||||||
|
* @param collectionName Name of the collection to operate on
|
||||||
|
* @param params Map<String, String> of parameter/value pairs
|
||||||
|
* @throws IOException Something went wrong
|
||||||
|
* <p>
|
||||||
|
* This form does not allow specifying multiple clauses, say "fq" clauses, use the form that
|
||||||
|
* takes a SolrParams. Transition code can call the preferred method that takes SolrParams
|
||||||
|
* by calling CloudSolrStream(zkHost, collectionName,
|
||||||
|
* new ModifiableSolrParams(SolrParams.toMultiMap(new NamedList(Map<String, String>)));
|
||||||
|
* @deprecated Use the constructor that has a SolrParams obj rather than a Map
|
||||||
|
*/
|
||||||
|
|
||||||
|
public SqlStream(String zkHost, String collectionName, SolrParams params) throws IOException {
|
||||||
|
init(collectionName, zkHost, params);
|
||||||
|
}
|
||||||
|
|
||||||
|
public SqlStream(StreamExpression expression, StreamFactory factory) throws IOException{
|
||||||
|
// grab all parameters out
|
||||||
|
String collectionName = factory.getValueOperand(expression, 0);
|
||||||
|
List<StreamExpressionNamedParameter> namedParams = factory.getNamedOperands(expression);
|
||||||
|
StreamExpressionNamedParameter zkHostExpression = factory.getNamedOperand(expression, "zkHost");
|
||||||
|
|
||||||
|
// Collection Name
|
||||||
|
if(null == collectionName){
|
||||||
|
throw new IOException(String.format(Locale.ROOT,"invalid expression %s - collectionName expected as first operand",expression));
|
||||||
|
}
|
||||||
|
|
||||||
|
// Validate there are no unknown parameters - zkHost and alias are namedParameter so we don't need to count it twice
|
||||||
|
if(expression.getParameters().size() != 1 + namedParams.size()){
|
||||||
|
throw new IOException(String.format(Locale.ROOT,"invalid expression %s - unknown operands found",expression));
|
||||||
|
}
|
||||||
|
|
||||||
|
// Named parameters - passed directly to solr as solrparams
|
||||||
|
if(0 == namedParams.size()){
|
||||||
|
throw new IOException(String.format(Locale.ROOT,"invalid expression %s - at least one named parameter expected. eg. 'q=*:*'",expression));
|
||||||
|
}
|
||||||
|
|
||||||
|
ModifiableSolrParams mParams = new ModifiableSolrParams();
|
||||||
|
for(StreamExpressionNamedParameter namedParam : namedParams){
|
||||||
|
if(!namedParam.getName().equals("zkHost") && !namedParam.getName().equals("aliases")){
|
||||||
|
mParams.add(namedParam.getName(), namedParam.getParameter().toString().trim());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// zkHost, optional - if not provided then will look into factory list to get
|
||||||
|
String zkHost = null;
|
||||||
|
if(null == zkHostExpression){
|
||||||
|
zkHost = factory.getCollectionZkHost(collectionName);
|
||||||
|
if(zkHost == null) {
|
||||||
|
zkHost = factory.getDefaultZkHost();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else if(zkHostExpression.getParameter() instanceof StreamExpressionValue){
|
||||||
|
zkHost = ((StreamExpressionValue)zkHostExpression.getParameter()).getValue();
|
||||||
|
}
|
||||||
|
/*
|
||||||
|
if(null == zkHost){
|
||||||
|
throw new IOException(String.format(Locale.ROOT,"invalid expression %s - zkHost not found for collection '%s'",expression,collectionName));
|
||||||
|
}
|
||||||
|
*/
|
||||||
|
|
||||||
|
// We've got all the required items
|
||||||
|
init(collectionName, zkHost, mParams);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public StreamExpression toExpression(StreamFactory factory) throws IOException {
|
||||||
|
// functionName(collectionName, param1, param2, ..., paramN, sort="comp", [aliases="field=alias,..."])
|
||||||
|
|
||||||
|
// function name
|
||||||
|
StreamExpression expression = new StreamExpression(factory.getFunctionName(getClass()));
|
||||||
|
|
||||||
|
// collection
|
||||||
|
expression.addParameter(collection);
|
||||||
|
|
||||||
|
// parameters
|
||||||
|
|
||||||
|
ModifiableSolrParams mParams = new ModifiableSolrParams(SolrParams.toMultiMap(params.toNamedList()));
|
||||||
|
for (Entry<String, String[]> param : mParams.getMap().entrySet()) {
|
||||||
|
String value = String.join(",", param.getValue());
|
||||||
|
|
||||||
|
// SOLR-8409: This is a special case where the params contain a " character
|
||||||
|
// Do note that in any other BASE streams with parameters where a " might come into play
|
||||||
|
// that this same replacement needs to take place.
|
||||||
|
value = value.replace("\"", "\\\"");
|
||||||
|
|
||||||
|
expression.addParameter(new StreamExpressionNamedParameter(param.getKey(), value));
|
||||||
|
}
|
||||||
|
|
||||||
|
// zkHost
|
||||||
|
expression.addParameter(new StreamExpressionNamedParameter("zkHost", zkHost));
|
||||||
|
|
||||||
|
return expression;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Explanation toExplanation(StreamFactory factory) throws IOException {
|
||||||
|
|
||||||
|
StreamExplanation explanation = new StreamExplanation(getStreamNodeId().toString());
|
||||||
|
|
||||||
|
explanation.setFunctionName(factory.getFunctionName(this.getClass()));
|
||||||
|
explanation.setImplementingClass(this.getClass().getName());
|
||||||
|
explanation.setExpressionType(ExpressionType.STREAM_SOURCE);
|
||||||
|
explanation.setExpression(toExpression(factory).toString());
|
||||||
|
|
||||||
|
// child is a datastore so add it at this point
|
||||||
|
StreamExplanation child = new StreamExplanation(getStreamNodeId() + "-datastore");
|
||||||
|
child.setFunctionName(String.format(Locale.ROOT, "solr (%s)", collection));
|
||||||
|
child.setImplementingClass("Solr/Lucene");
|
||||||
|
child.setExpressionType(ExpressionType.DATASTORE);
|
||||||
|
|
||||||
|
if(null != params){
|
||||||
|
ModifiableSolrParams mParams = new ModifiableSolrParams(params);
|
||||||
|
child.setExpression(mParams.getMap().entrySet().stream().map(e -> String.format(Locale.ROOT, "%s=%s", e.getKey(), e.getValue())).collect(Collectors.joining(",")));
|
||||||
|
}
|
||||||
|
explanation.addChild(child);
|
||||||
|
|
||||||
|
return explanation;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void init(String collectionName, String zkHost, SolrParams params) throws IOException {
|
||||||
|
this.zkHost = zkHost;
|
||||||
|
this.collection = collectionName;
|
||||||
|
this.params = new ModifiableSolrParams(params);
|
||||||
|
|
||||||
|
// If the comparator is null then it was not explicitly set so we will create one using the sort parameter
|
||||||
|
// of the query. While doing this we will also take into account any aliases such that if we are sorting on
|
||||||
|
// fieldA but fieldA is aliased to alias.fieldA then the comparater will be against alias.fieldA.
|
||||||
|
|
||||||
|
if (params.get("stmt") == null) {
|
||||||
|
throw new IOException("stmt param expected for search function");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setStreamContext(StreamContext context) {
|
||||||
|
this.streamContext = context;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Opens the CloudSolrStream
|
||||||
|
*
|
||||||
|
***/
|
||||||
|
public void open() throws IOException {
|
||||||
|
constructStream();
|
||||||
|
tupleStream.open();
|
||||||
|
}
|
||||||
|
|
||||||
|
public List<TupleStream> children() {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
public static Collection<Slice> getSlices(String collectionName, ZkStateReader zkStateReader, boolean checkAlias) throws IOException {
|
||||||
|
ClusterState clusterState = zkStateReader.getClusterState();
|
||||||
|
|
||||||
|
Map<String, DocCollection> collectionsMap = clusterState.getCollectionsMap();
|
||||||
|
|
||||||
|
// Check collection case sensitive
|
||||||
|
if(collectionsMap.containsKey(collectionName)) {
|
||||||
|
return collectionsMap.get(collectionName).getActiveSlices();
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check collection case insensitive
|
||||||
|
for(String collectionMapKey : collectionsMap.keySet()) {
|
||||||
|
if(collectionMapKey.equalsIgnoreCase(collectionName)) {
|
||||||
|
return collectionsMap.get(collectionMapKey).getActiveSlices();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if(checkAlias) {
|
||||||
|
// check for collection alias
|
||||||
|
Aliases aliases = zkStateReader.getAliases();
|
||||||
|
String alias = aliases.getCollectionAlias(collectionName);
|
||||||
|
if (alias != null) {
|
||||||
|
Collection<Slice> slices = new ArrayList<>();
|
||||||
|
|
||||||
|
List<String> aliasList = StrUtils.splitSmart(alias, ",", true);
|
||||||
|
for (String aliasCollectionName : aliasList) {
|
||||||
|
// Add all active slices for this alias collection
|
||||||
|
slices.addAll(collectionsMap.get(aliasCollectionName).getActiveSlices());
|
||||||
|
}
|
||||||
|
|
||||||
|
return slices;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
throw new IOException("Slices not found for " + collectionName);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void constructStream() throws IOException {
|
||||||
|
try {
|
||||||
|
|
||||||
|
List<String> shardUrls = getShards(this.zkHost, this.collection, this.streamContext);
|
||||||
|
Collections.shuffle(shardUrls);
|
||||||
|
String url = shardUrls.get(0);
|
||||||
|
ModifiableSolrParams mParams = new ModifiableSolrParams(params);
|
||||||
|
mParams.add(CommonParams.QT, "/sql");
|
||||||
|
this.tupleStream = new SolrStream(url, mParams);
|
||||||
|
if(streamContext != null) {
|
||||||
|
tupleStream.setStreamContext(streamContext);
|
||||||
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
throw new IOException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void close() throws IOException {
|
||||||
|
tupleStream.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Return the stream sort - ie, the order in which records are returned */
|
||||||
|
public StreamComparator getStreamSort(){
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Tuple read() throws IOException {
|
||||||
|
return tupleStream.read();
|
||||||
|
}
|
||||||
|
}
|
|
@ -242,6 +242,46 @@ public class StreamExpressionTest extends SolrCloudTestCase {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSqlStream() throws Exception {
|
||||||
|
|
||||||
|
new UpdateRequest()
|
||||||
|
.add(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "0")
|
||||||
|
.add(id, "2", "a_s", "hello2", "a_i", "2", "a_f", "0")
|
||||||
|
.add(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3")
|
||||||
|
.add(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4")
|
||||||
|
.add(id, "1", "a_s", "hello1", "a_i", "1", "a_f", "1")
|
||||||
|
.commit(cluster.getSolrClient(), COLLECTIONORALIAS);
|
||||||
|
|
||||||
|
List<Tuple> tuples;
|
||||||
|
StreamContext streamContext = new StreamContext();
|
||||||
|
SolrClientCache solrClientCache = new SolrClientCache();
|
||||||
|
streamContext.setSolrClientCache(solrClientCache);
|
||||||
|
List<String> shardUrls = TupleStream.getShards(cluster.getZkServer().getZkAddress(), COLLECTIONORALIAS, streamContext);
|
||||||
|
|
||||||
|
try {
|
||||||
|
StringBuilder buf = new StringBuilder();
|
||||||
|
for (String shardUrl : shardUrls) {
|
||||||
|
if (buf.length() > 0) {
|
||||||
|
buf.append(",");
|
||||||
|
}
|
||||||
|
buf.append(shardUrl);
|
||||||
|
}
|
||||||
|
|
||||||
|
ModifiableSolrParams solrParams = new ModifiableSolrParams();
|
||||||
|
solrParams.add("qt", "/stream");
|
||||||
|
solrParams.add("expr", "sql("+COLLECTIONORALIAS+", stmt=\"select id from collection1 order by a_i asc\")");
|
||||||
|
SolrStream solrStream = new SolrStream(shardUrls.get(0), solrParams);
|
||||||
|
solrStream.setStreamContext(streamContext);
|
||||||
|
tuples = getTuples(solrStream);
|
||||||
|
assert (tuples.size() == 5);
|
||||||
|
assertOrder(tuples, 0, 1, 2, 3, 4);
|
||||||
|
|
||||||
|
} finally {
|
||||||
|
solrClientCache.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue