diff --git a/solr/core/src/java/org/apache/solr/handler/StreamHandler.java b/solr/core/src/java/org/apache/solr/handler/StreamHandler.java index 99f5cc30323..bf373837fed 100644 --- a/solr/core/src/java/org/apache/solr/handler/StreamHandler.java +++ b/solr/core/src/java/org/apache/solr/handler/StreamHandler.java @@ -167,6 +167,7 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware, .withFunctionName("get", GetStream.class) .withFunctionName("timeseries", TimeSeriesStream.class) .withFunctionName("tuple", TupStream.class) + .withFunctionName("sql", SqlStream.class) .withFunctionName("col", ColumnEvaluator.class) .withFunctionName("predict", PredictEvaluator.class) .withFunctionName("regress", RegressionEvaluator.class) diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SqlStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SqlStream.java new file mode 100644 index 00000000000..5a81b7466fb --- /dev/null +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SqlStream.java @@ -0,0 +1,280 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.solr.client.solrj.io.stream; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Map.Entry; +import java.util.stream.Collectors; + +import org.apache.solr.client.solrj.impl.CloudSolrClient; +import org.apache.solr.client.solrj.io.Tuple; +import org.apache.solr.client.solrj.io.comp.StreamComparator; +import org.apache.solr.client.solrj.io.stream.expr.Explanation; +import org.apache.solr.client.solrj.io.stream.expr.Explanation.ExpressionType; +import org.apache.solr.client.solrj.io.stream.expr.Expressible; +import org.apache.solr.client.solrj.io.stream.expr.StreamExplanation; +import org.apache.solr.client.solrj.io.stream.expr.StreamExpression; +import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParameter; +import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionValue; +import org.apache.solr.client.solrj.io.stream.expr.StreamFactory; +import org.apache.solr.common.cloud.Aliases; +import org.apache.solr.common.cloud.ClusterState; +import org.apache.solr.common.cloud.DocCollection; +import org.apache.solr.common.cloud.Slice; +import org.apache.solr.common.cloud.ZkStateReader; +import org.apache.solr.common.params.CommonParams; +import org.apache.solr.common.params.ModifiableSolrParams; +import org.apache.solr.common.params.SolrParams; +import org.apache.solr.common.util.StrUtils; + +public class SqlStream extends TupleStream implements Expressible { + + private static final long serialVersionUID = 1; + + protected String zkHost; + protected String collection; + protected SolrParams params; + protected transient CloudSolrClient cloudSolrClient; + protected transient TupleStream tupleStream; + protected transient StreamContext streamContext; + + /** + * @param zkHost Zookeeper ensemble connection string + * @param collectionName Name of the collection to operate on + * @param params Map<String, String> of parameter/value pairs + * @throws IOException Something went wrong + *

+ * This form does not allow specifying multiple clauses, say "fq" clauses, use the form that + * takes a SolrParams. Transition code can call the preferred method that takes SolrParams + * by calling CloudSolrStream(zkHost, collectionName, + * new ModifiableSolrParams(SolrParams.toMultiMap(new NamedList(Map<String, String>))); + * @deprecated Use the constructor that has a SolrParams obj rather than a Map + */ + + public SqlStream(String zkHost, String collectionName, SolrParams params) throws IOException { + init(collectionName, zkHost, params); + } + + public SqlStream(StreamExpression expression, StreamFactory factory) throws IOException{ + // grab all parameters out + String collectionName = factory.getValueOperand(expression, 0); + List namedParams = factory.getNamedOperands(expression); + StreamExpressionNamedParameter zkHostExpression = factory.getNamedOperand(expression, "zkHost"); + + // Collection Name + if(null == collectionName){ + throw new IOException(String.format(Locale.ROOT,"invalid expression %s - collectionName expected as first operand",expression)); + } + + // Validate there are no unknown parameters - zkHost and alias are namedParameter so we don't need to count it twice + if(expression.getParameters().size() != 1 + namedParams.size()){ + throw new IOException(String.format(Locale.ROOT,"invalid expression %s - unknown operands found",expression)); + } + + // Named parameters - passed directly to solr as solrparams + if(0 == namedParams.size()){ + throw new IOException(String.format(Locale.ROOT,"invalid expression %s - at least one named parameter expected. eg. 'q=*:*'",expression)); + } + + ModifiableSolrParams mParams = new ModifiableSolrParams(); + for(StreamExpressionNamedParameter namedParam : namedParams){ + if(!namedParam.getName().equals("zkHost") && !namedParam.getName().equals("aliases")){ + mParams.add(namedParam.getName(), namedParam.getParameter().toString().trim()); + } + } + + // zkHost, optional - if not provided then will look into factory list to get + String zkHost = null; + if(null == zkHostExpression){ + zkHost = factory.getCollectionZkHost(collectionName); + if(zkHost == null) { + zkHost = factory.getDefaultZkHost(); + } + } + else if(zkHostExpression.getParameter() instanceof StreamExpressionValue){ + zkHost = ((StreamExpressionValue)zkHostExpression.getParameter()).getValue(); + } + /* + if(null == zkHost){ + throw new IOException(String.format(Locale.ROOT,"invalid expression %s - zkHost not found for collection '%s'",expression,collectionName)); + } + */ + + // We've got all the required items + init(collectionName, zkHost, mParams); + } + + @Override + public StreamExpression toExpression(StreamFactory factory) throws IOException { + // functionName(collectionName, param1, param2, ..., paramN, sort="comp", [aliases="field=alias,..."]) + + // function name + StreamExpression expression = new StreamExpression(factory.getFunctionName(getClass())); + + // collection + expression.addParameter(collection); + + // parameters + + ModifiableSolrParams mParams = new ModifiableSolrParams(SolrParams.toMultiMap(params.toNamedList())); + for (Entry param : mParams.getMap().entrySet()) { + String value = String.join(",", param.getValue()); + + // SOLR-8409: This is a special case where the params contain a " character + // Do note that in any other BASE streams with parameters where a " might come into play + // that this same replacement needs to take place. + value = value.replace("\"", "\\\""); + + expression.addParameter(new StreamExpressionNamedParameter(param.getKey(), value)); + } + + // zkHost + expression.addParameter(new StreamExpressionNamedParameter("zkHost", zkHost)); + + return expression; + } + + @Override + public Explanation toExplanation(StreamFactory factory) throws IOException { + + StreamExplanation explanation = new StreamExplanation(getStreamNodeId().toString()); + + explanation.setFunctionName(factory.getFunctionName(this.getClass())); + explanation.setImplementingClass(this.getClass().getName()); + explanation.setExpressionType(ExpressionType.STREAM_SOURCE); + explanation.setExpression(toExpression(factory).toString()); + + // child is a datastore so add it at this point + StreamExplanation child = new StreamExplanation(getStreamNodeId() + "-datastore"); + child.setFunctionName(String.format(Locale.ROOT, "solr (%s)", collection)); + child.setImplementingClass("Solr/Lucene"); + child.setExpressionType(ExpressionType.DATASTORE); + + if(null != params){ + ModifiableSolrParams mParams = new ModifiableSolrParams(params); + child.setExpression(mParams.getMap().entrySet().stream().map(e -> String.format(Locale.ROOT, "%s=%s", e.getKey(), e.getValue())).collect(Collectors.joining(","))); + } + explanation.addChild(child); + + return explanation; + } + + protected void init(String collectionName, String zkHost, SolrParams params) throws IOException { + this.zkHost = zkHost; + this.collection = collectionName; + this.params = new ModifiableSolrParams(params); + + // If the comparator is null then it was not explicitly set so we will create one using the sort parameter + // of the query. While doing this we will also take into account any aliases such that if we are sorting on + // fieldA but fieldA is aliased to alias.fieldA then the comparater will be against alias.fieldA. + + if (params.get("stmt") == null) { + throw new IOException("stmt param expected for search function"); + } + } + + public void setStreamContext(StreamContext context) { + this.streamContext = context; + } + + /** + * Opens the CloudSolrStream + * + ***/ + public void open() throws IOException { + constructStream(); + tupleStream.open(); + } + + public List children() { + return null; + } + + + public static Collection getSlices(String collectionName, ZkStateReader zkStateReader, boolean checkAlias) throws IOException { + ClusterState clusterState = zkStateReader.getClusterState(); + + Map collectionsMap = clusterState.getCollectionsMap(); + + // Check collection case sensitive + if(collectionsMap.containsKey(collectionName)) { + return collectionsMap.get(collectionName).getActiveSlices(); + } + + // Check collection case insensitive + for(String collectionMapKey : collectionsMap.keySet()) { + if(collectionMapKey.equalsIgnoreCase(collectionName)) { + return collectionsMap.get(collectionMapKey).getActiveSlices(); + } + } + + if(checkAlias) { + // check for collection alias + Aliases aliases = zkStateReader.getAliases(); + String alias = aliases.getCollectionAlias(collectionName); + if (alias != null) { + Collection slices = new ArrayList<>(); + + List aliasList = StrUtils.splitSmart(alias, ",", true); + for (String aliasCollectionName : aliasList) { + // Add all active slices for this alias collection + slices.addAll(collectionsMap.get(aliasCollectionName).getActiveSlices()); + } + + return slices; + } + } + + throw new IOException("Slices not found for " + collectionName); + } + + protected void constructStream() throws IOException { + try { + + List shardUrls = getShards(this.zkHost, this.collection, this.streamContext); + Collections.shuffle(shardUrls); + String url = shardUrls.get(0); + ModifiableSolrParams mParams = new ModifiableSolrParams(params); + mParams.add(CommonParams.QT, "/sql"); + this.tupleStream = new SolrStream(url, mParams); + if(streamContext != null) { + tupleStream.setStreamContext(streamContext); + } + } catch (Exception e) { + throw new IOException(e); + } + } + + public void close() throws IOException { + tupleStream.close(); + } + + /** Return the stream sort - ie, the order in which records are returned */ + public StreamComparator getStreamSort(){ + return null; + } + + public Tuple read() throws IOException { + return tupleStream.read(); + } +} diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java index 129de1ce161..e651338bf21 100644 --- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java +++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java @@ -242,6 +242,46 @@ public class StreamExpressionTest extends SolrCloudTestCase { } } + @Test + public void testSqlStream() throws Exception { + + new UpdateRequest() + .add(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "0") + .add(id, "2", "a_s", "hello2", "a_i", "2", "a_f", "0") + .add(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3") + .add(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4") + .add(id, "1", "a_s", "hello1", "a_i", "1", "a_f", "1") + .commit(cluster.getSolrClient(), COLLECTIONORALIAS); + + List tuples; + StreamContext streamContext = new StreamContext(); + SolrClientCache solrClientCache = new SolrClientCache(); + streamContext.setSolrClientCache(solrClientCache); + List shardUrls = TupleStream.getShards(cluster.getZkServer().getZkAddress(), COLLECTIONORALIAS, streamContext); + + try { + StringBuilder buf = new StringBuilder(); + for (String shardUrl : shardUrls) { + if (buf.length() > 0) { + buf.append(","); + } + buf.append(shardUrl); + } + + ModifiableSolrParams solrParams = new ModifiableSolrParams(); + solrParams.add("qt", "/stream"); + solrParams.add("expr", "sql("+COLLECTIONORALIAS+", stmt=\"select id from collection1 order by a_i asc\")"); + SolrStream solrStream = new SolrStream(shardUrls.get(0), solrParams); + solrStream.setStreamContext(streamContext); + tuples = getTuples(solrStream); + assert (tuples.size() == 5); + assertOrder(tuples, 0, 1, 2, 3, 4); + + } finally { + solrClientCache.close(); + } + } +