SOLR-9103: Restore ability for users to add custom Streaming Expressions

This commit is contained in:
Dennis Gove 2016-10-11 20:30:26 -04:00
parent 0f08ad9ad3
commit 56cd8bffc6
5 changed files with 253 additions and 28 deletions

View File

@ -125,6 +125,8 @@ New Features
* SOLR-9337: Add fetch Streaming Expression (Joel Bernstein)
* SOLR-9103: Restore ability for users to add custom Streaming Expressions (Cao Manh Dat)
Bug Fixes
----------------------

View File

@ -17,8 +17,17 @@
package org.apache.solr.core;
import javax.xml.parsers.ParserConfigurationException;
import javax.xml.xpath.XPathConstants;
import static org.apache.solr.common.params.CommonParams.NAME;
import static org.apache.solr.common.params.CommonParams.PATH;
import static org.apache.solr.common.util.Utils.makeMap;
import static org.apache.solr.core.ConfigOverlay.ZNODEVER;
import static org.apache.solr.core.SolrConfig.PluginOpts.LAZY;
import static org.apache.solr.core.SolrConfig.PluginOpts.MULTI_OK;
import static org.apache.solr.core.SolrConfig.PluginOpts.NOOP;
import static org.apache.solr.core.SolrConfig.PluginOpts.REQUIRE_CLASS;
import static org.apache.solr.core.SolrConfig.PluginOpts.REQUIRE_NAME;
import static org.apache.solr.core.SolrConfig.PluginOpts.REQUIRE_NAME_IN_OVERLAY;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
@ -42,10 +51,13 @@ import java.util.UUID;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import com.google.common.collect.ImmutableList;
import javax.xml.parsers.ParserConfigurationException;
import javax.xml.xpath.XPathConstants;
import org.apache.lucene.index.IndexDeletionPolicy;
import org.apache.lucene.search.BooleanQuery;
import org.apache.lucene.util.Version;
import org.apache.solr.client.solrj.io.stream.expr.Expressible;
import org.apache.solr.cloud.ZkSolrResourceLoader;
import org.apache.solr.common.MapSerializable;
import org.apache.solr.common.SolrException;
@ -79,16 +91,7 @@ import org.w3c.dom.NodeList;
import org.xml.sax.InputSource;
import org.xml.sax.SAXException;
import static org.apache.solr.common.params.CommonParams.NAME;
import static org.apache.solr.common.params.CommonParams.PATH;
import static org.apache.solr.common.util.Utils.makeMap;
import static org.apache.solr.core.ConfigOverlay.ZNODEVER;
import static org.apache.solr.core.SolrConfig.PluginOpts.LAZY;
import static org.apache.solr.core.SolrConfig.PluginOpts.MULTI_OK;
import static org.apache.solr.core.SolrConfig.PluginOpts.NOOP;
import static org.apache.solr.core.SolrConfig.PluginOpts.REQUIRE_CLASS;
import static org.apache.solr.core.SolrConfig.PluginOpts.REQUIRE_NAME;
import static org.apache.solr.core.SolrConfig.PluginOpts.REQUIRE_NAME_IN_OVERLAY;
import com.google.common.collect.ImmutableList;
/**
@ -333,6 +336,7 @@ public class SolrConfig extends Config implements MapSerializable {
public static final List<SolrPluginInfo> plugins = ImmutableList.<SolrPluginInfo>builder()
.add(new SolrPluginInfo(SolrRequestHandler.class, SolrRequestHandler.TYPE, REQUIRE_NAME, REQUIRE_CLASS, MULTI_OK, LAZY))
.add(new SolrPluginInfo(QParserPlugin.class, "queryParser", REQUIRE_NAME, REQUIRE_CLASS, MULTI_OK))
.add(new SolrPluginInfo(Expressible.class, "expressible", REQUIRE_NAME, REQUIRE_CLASS, MULTI_OK))
.add(new SolrPluginInfo(QueryResponseWriter.class, "queryResponseWriter", REQUIRE_NAME, REQUIRE_CLASS, MULTI_OK, LAZY))
.add(new SolrPluginInfo(ValueSourceParser.class, "valueSourceParser", REQUIRE_NAME, REQUIRE_CLASS, MULTI_OK))
.add(new SolrPluginInfo(TransformerFactory.class, "transformer", REQUIRE_NAME, REQUIRE_CLASS, MULTI_OK))

View File

@ -23,7 +23,6 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import org.apache.solr.client.solrj.io.ModelCache;
import org.apache.solr.client.solrj.io.SolrClientCache;
@ -35,7 +34,36 @@ import org.apache.solr.client.solrj.io.ops.ConcatOperation;
import org.apache.solr.client.solrj.io.ops.DistinctOperation;
import org.apache.solr.client.solrj.io.ops.GroupOperation;
import org.apache.solr.client.solrj.io.ops.ReplaceOperation;
import org.apache.solr.client.solrj.io.stream.*;
import org.apache.solr.client.solrj.io.stream.CloudSolrStream;
import org.apache.solr.client.solrj.io.stream.CommitStream;
import org.apache.solr.client.solrj.io.stream.ComplementStream;
import org.apache.solr.client.solrj.io.stream.DaemonStream;
import org.apache.solr.client.solrj.io.stream.ExceptionStream;
import org.apache.solr.client.solrj.io.stream.FacetStream;
import org.apache.solr.client.solrj.io.stream.FeaturesSelectionStream;
import org.apache.solr.client.solrj.io.stream.FetchStream;
import org.apache.solr.client.solrj.io.stream.HashJoinStream;
import org.apache.solr.client.solrj.io.stream.InnerJoinStream;
import org.apache.solr.client.solrj.io.stream.IntersectStream;
import org.apache.solr.client.solrj.io.stream.JDBCStream;
import org.apache.solr.client.solrj.io.stream.LeftOuterJoinStream;
import org.apache.solr.client.solrj.io.stream.MergeStream;
import org.apache.solr.client.solrj.io.stream.ModelStream;
import org.apache.solr.client.solrj.io.stream.OuterHashJoinStream;
import org.apache.solr.client.solrj.io.stream.ParallelStream;
import org.apache.solr.client.solrj.io.stream.RankStream;
import org.apache.solr.client.solrj.io.stream.ReducerStream;
import org.apache.solr.client.solrj.io.stream.RollupStream;
import org.apache.solr.client.solrj.io.stream.ScoreNodesStream;
import org.apache.solr.client.solrj.io.stream.SelectStream;
import org.apache.solr.client.solrj.io.stream.SortStream;
import org.apache.solr.client.solrj.io.stream.StatsStream;
import org.apache.solr.client.solrj.io.stream.StreamContext;
import org.apache.solr.client.solrj.io.stream.TextLogitStream;
import org.apache.solr.client.solrj.io.stream.TopicStream;
import org.apache.solr.client.solrj.io.stream.TupleStream;
import org.apache.solr.client.solrj.io.stream.UniqueStream;
import org.apache.solr.client.solrj.io.stream.UpdateStream;
import org.apache.solr.client.solrj.io.stream.expr.Explanation;
import org.apache.solr.client.solrj.io.stream.expr.Explanation.ExpressionType;
import org.apache.solr.client.solrj.io.stream.expr.Expressible;
@ -50,9 +78,9 @@ import org.apache.solr.common.SolrException;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.core.CloseHook;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.core.PluginInfo;
import org.apache.solr.core.SolrCore;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.response.SolrQueryResponse;
@ -135,9 +163,9 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware,
.withFunctionName("gatherNodes", GatherNodesStream.class)
.withFunctionName("select", SelectStream.class)
.withFunctionName("scoreNodes", ScoreNodesStream.class)
.withFunctionName("model", ModelStream.class)
.withFunctionName("classify", ClassifyStream.class)
.withFunctionName("fetch", FetchStream.class)
.withFunctionName("model", ModelStream.class)
.withFunctionName("classify", ClassifyStream.class)
.withFunctionName("fetch", FetchStream.class)
// metrics
.withFunctionName("min", MinMetric.class)
@ -154,15 +182,12 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware,
.withFunctionName("group", GroupOperation.class)
.withFunctionName("distinct", DistinctOperation.class);
// This pulls all the overrides and additions from the config
Object functionMappingsObj = initArgs.get("streamFunctions");
if(null != functionMappingsObj){
NamedList<?> functionMappings = (NamedList<?>)functionMappingsObj;
for(Entry<String,?> functionMapping : functionMappings){
Class<? extends Expressible> clazz = core.getResourceLoader().findClass((String)functionMapping.getValue(), Expressible.class);
streamFactory.withFunctionName(functionMapping.getKey(), clazz);
}
}
// This pulls all the overrides and additions from the config
List<PluginInfo> pluginInfos = core.getSolrConfig().getPluginInfos(Expressible.class.getName());
for (PluginInfo pluginInfo : pluginInfos) {
Class<? extends Expressible> clazz = core.getResourceLoader().findClass(pluginInfo.className, Expressible.class);
streamFactory.withFunctionName(pluginInfo.name, clazz);
}
core.addCloseHook(new CloseHook() {
@Override

View File

@ -0,0 +1,100 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.core;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.solr.client.solrj.io.Tuple;
import org.apache.solr.client.solrj.io.comp.StreamComparator;
import org.apache.solr.client.solrj.io.stream.StreamContext;
import org.apache.solr.client.solrj.io.stream.TupleStream;
import org.apache.solr.client.solrj.io.stream.expr.Explanation;
import org.apache.solr.client.solrj.io.stream.expr.Expressible;
import org.apache.solr.client.solrj.io.stream.expr.StreamExplanation;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParameter;
import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
public class HelloStream extends TupleStream implements Expressible{
boolean isSentHelloWorld = false;
public HelloStream() {
}
public HelloStream(StreamExpression expression, StreamFactory factory) throws IOException{
}
@Override
public void setStreamContext(StreamContext context) {
}
@Override
public List<TupleStream> children() {
return null;
}
@Override
public void open() throws IOException {
}
@Override
public void close() throws IOException {
}
@Override
public Tuple read() throws IOException {
if (isSentHelloWorld) {
Map m = new HashMap();
m.put("EOF", true);
return new Tuple(m);
} else {
isSentHelloWorld = true;
Map m = new HashMap<>();
m.put("msg", "Hello World!");
return new Tuple(m);
}
}
@Override
public StreamComparator getStreamSort() {
return null;
}
@Override
public StreamExpressionParameter toExpression(StreamFactory factory) throws IOException {
StreamExpression expression = new StreamExpression(factory.getFunctionName(this.getClass()));
return expression;
}
@Override
public Explanation toExplanation(StreamFactory factory) throws IOException {
return new StreamExplanation(getStreamNodeId().toString())
.withFunctionName("hello")
.withImplementingClass(this.getClass().getName())
.withExpressionType(Explanation.ExpressionType.STREAM_SOURCE)
.withExpression("--non-expressible--");
}
}

View File

@ -0,0 +1,94 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.core;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.cloud.AbstractFullDistribZkTestBase;
import org.apache.solr.handler.TestBlobHandler;
import org.apache.solr.util.RestTestHarness;
import org.junit.BeforeClass;
import org.junit.Test;
/**
* Created by caomanhdat on 6/3/16.
*/
public class TestCustomStream extends AbstractFullDistribZkTestBase {
private List<RestTestHarness> restTestHarnesses = new ArrayList<>();
private void setupHarnesses() {
for (final SolrClient client : clients) {
RestTestHarness harness = new RestTestHarness(() -> ((HttpSolrClient)client).getBaseURL());
restTestHarnesses.add(harness);
}
}
@BeforeClass
public static void enableRuntimeLib() throws Exception {
System.setProperty("enable.runtime.lib", "true");
}
@Override
public void distribTearDown() throws Exception {
super.distribTearDown();
for (RestTestHarness r : restTestHarnesses) {
r.close();
}
}
@Test
public void testDynamicLoadingCustomStream() throws Exception {
System.setProperty("enable.runtime.lib", "true");
setupHarnesses();
String blobName = "colltest";
HttpSolrClient randomClient = (HttpSolrClient) clients.get(random().nextInt(clients.size()));
String baseURL = randomClient.getBaseURL();
baseURL = baseURL.substring(0, baseURL.lastIndexOf('/'));
TestBlobHandler.createSystemCollection(getHttpSolrClient(baseURL, randomClient.getHttpClient()));
waitForRecoveriesToFinish(".system", true);
String payload = "{\n" +
"'create-expressible' : { 'name' : 'hello', 'class': 'org.apache.solr.core.HelloStream' }\n" +
"}";
RestTestHarness client = restTestHarnesses.get(random().nextInt(restTestHarnesses.size()));
TestSolrConfigHandler.runConfigCommand(client,"/config?wt=json",payload);
TestSolrConfigHandler.testForResponseElement(client,
null,
"/config/overlay?wt=json",
null,
Arrays.asList("overlay", "expressible", "hello", "class"),
"org.apache.solr.core.HelloStream",10);
TestSolrConfigHandler.testForResponseElement(client,
null,
"/stream?expr=hello()",
null,
Arrays.asList("result-set", "docs[0]", "msg"),
"Hello World!",10);
}
}