SOLR-14125 : Streaming expressions to be loadable from packages (#1108)

SOLR-14125: Make <expressible> plugins work with packages
This commit is contained in:
Noble Paul 2019-12-23 15:20:26 +11:00 committed by GitHub
parent 93309e9728
commit ef15ae9805
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 229 additions and 92 deletions

View File

@ -29,8 +29,8 @@ import java.util.TreeMap;
import org.apache.lucene.analysis.ja.util.CSVUtil;
import org.apache.lucene.util.IntsRefBuilder;
import org.apache.lucene.util.fst.FSTCompiler;
import org.apache.lucene.util.fst.FST;
import org.apache.lucene.util.fst.FSTCompiler;
import org.apache.lucene.util.fst.PositiveIntOutputs;
/**

View File

@ -161,6 +161,8 @@ Improvements
* SOLR-14095: Replace Java serialization with Javabin in the Overseer queues (Tomás Fernández Löbbe)
* SOLR-14125: Make <expressible> plugins work with packages (noble)
Optimizations
---------------------
(No changes)

View File

@ -19,6 +19,7 @@ package org.apache.solr.handler;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -27,7 +28,9 @@ import java.util.Map.Entry;
import org.apache.solr.client.solrj.io.Tuple;
import org.apache.solr.client.solrj.io.comp.StreamComparator;
import org.apache.solr.client.solrj.io.graph.Traversal;
import org.apache.solr.client.solrj.io.stream.*;
import org.apache.solr.client.solrj.io.stream.ExceptionStream;
import org.apache.solr.client.solrj.io.stream.StreamContext;
import org.apache.solr.client.solrj.io.stream.TupleStream;
import org.apache.solr.client.solrj.io.stream.expr.DefaultStreamFactory;
import org.apache.solr.client.solrj.io.stream.expr.Explanation;
import org.apache.solr.client.solrj.io.stream.expr.Expressible;
@ -38,6 +41,8 @@ import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.core.PluginInfo;
import org.apache.solr.core.SolrConfig;
import org.apache.solr.core.SolrCore;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.response.SolrQueryResponse;
@ -90,11 +95,21 @@ public class GraphHandler extends RequestHandlerBase implements SolrCoreAware, P
Object functionMappingsObj = initArgs.get("streamFunctions");
if(null != functionMappingsObj){
NamedList<?> functionMappings = (NamedList<?>)functionMappingsObj;
for(Entry<String,?> functionMapping : functionMappings){
Class<? extends Expressible> clazz = core.getResourceLoader().findClass((String)functionMapping.getValue(),
for(Entry<String,?> functionMapping : functionMappings) {
String key = functionMapping.getKey();
PluginInfo pluginInfo = new PluginInfo(key, Collections.singletonMap("class", functionMapping.getValue()));
if (pluginInfo.pkgName == null) {
Class<? extends Expressible> clazz = core.getResourceLoader().findClass((String) functionMapping.getValue(),
Expressible.class);
streamFactory.withFunctionName(functionMapping.getKey(), clazz);
streamFactory.withFunctionName(key, clazz);
} else {
StreamHandler.ExpressibleHolder holder = new StreamHandler.ExpressibleHolder(pluginInfo, core, SolrConfig.classVsSolrPluginInfo.get(Expressible.class));
streamFactory.withFunctionName(key, () -> holder.getClazz());
}
}
}
}

View File

@ -45,6 +45,7 @@ import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.SolrResponse;
import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.client.solrj.io.stream.expr.Expressible;
import org.apache.solr.cloud.ZkController;
import org.apache.solr.cloud.ZkSolrResourceLoader;
import org.apache.solr.common.SolrException;
@ -588,9 +589,17 @@ public class SolrConfigHandler extends RequestHandlerBase implements SolrCoreAwa
private boolean verifyClass(CommandOperation op, String clz, Class expected) {
if (clz == null) return true;
if (!"true".equals(String.valueOf(op.getStr("runtimeLib", null)))) {
PluginInfo info = new PluginInfo(SolrRequestHandler.TYPE, op.getDataMap());
//this is not dynamically loaded so we can verify the class right away
try {
req.getCore().createInitInstance(new PluginInfo(SolrRequestHandler.TYPE, op.getDataMap()), expected, clz, "");
if(expected == Expressible.class) {
SolrResourceLoader resourceLoader = info.pkgName == null ?
req.getCore().getResourceLoader() :
req.getCore().getResourceLoader(info.pkgName);
resourceLoader.findClass(info.className, expected);
} else {
req.getCore().createInitInstance(info, expected, clz, "");
}
} catch (Exception e) {
log.error("Error checking plugin : ", e);
op.addError(e.getMessage());

View File

@ -33,7 +33,10 @@ import org.apache.solr.client.solrj.io.ModelCache;
import org.apache.solr.client.solrj.io.SolrClientCache;
import org.apache.solr.client.solrj.io.Tuple;
import org.apache.solr.client.solrj.io.comp.StreamComparator;
import org.apache.solr.client.solrj.io.stream.*;
import org.apache.solr.client.solrj.io.stream.DaemonStream;
import org.apache.solr.client.solrj.io.stream.ExceptionStream;
import org.apache.solr.client.solrj.io.stream.StreamContext;
import org.apache.solr.client.solrj.io.stream.TupleStream;
import org.apache.solr.client.solrj.io.stream.expr.Explanation;
import org.apache.solr.client.solrj.io.stream.expr.Explanation.ExpressionType;
import org.apache.solr.client.solrj.io.stream.expr.Expressible;
@ -44,6 +47,7 @@ import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParser;
import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
import org.apache.solr.client.solrj.routing.RequestReplicaListTransformerGenerator;
import org.apache.solr.cloud.ZkController;
import org.apache.solr.common.MapWriter;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.CommonParams;
@ -52,7 +56,10 @@ import org.apache.solr.common.params.SolrParams;
import org.apache.solr.core.CloseHook;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.core.PluginInfo;
import org.apache.solr.core.SolrConfig;
import org.apache.solr.core.SolrCore;
import org.apache.solr.pkg.PackageLoader;
import org.apache.solr.pkg.PackagePluginHolder;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.response.SolrQueryResponse;
import org.apache.solr.security.AuthorizationContext;
@ -87,7 +94,7 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware,
private SolrDefaultStreamFactory streamFactory = new SolrDefaultStreamFactory();
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private String coreName;
private Map<String,DaemonStream> daemons = Collections.synchronizedMap(new HashMap());
private Map<String, DaemonStream> daemons = Collections.synchronizedMap(new HashMap());
@Override
public PermissionNameProvider.Name getPermissionName(AuthorizationContext request) {
@ -118,9 +125,15 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware,
// This pulls all the overrides and additions from the config
List<PluginInfo> pluginInfos = core.getSolrConfig().getPluginInfos(Expressible.class.getName());
for (PluginInfo pluginInfo : pluginInfos) {
if (pluginInfo.pkgName != null) {
ExpressibleHolder holder = new ExpressibleHolder(pluginInfo, core, SolrConfig.classVsSolrPluginInfo.get(Expressible.class));
streamFactory.withFunctionName(pluginInfo.name,
() -> holder.getClazz());
} else {
Class<? extends Expressible> clazz = core.getMemClassLoader().findClass(pluginInfo.className, Expressible.class);
streamFactory.withFunctionName(pluginInfo.name, clazz);
}
}
core.addCloseHook(new CloseHook() {
@Override
@ -135,6 +148,24 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware,
});
}
public static class ExpressibleHolder extends PackagePluginHolder {
private Class clazz;
public ExpressibleHolder(PluginInfo info, SolrCore core, SolrConfig.SolrPluginInfo pluginMeta) {
super(info, core, pluginMeta);
}
public Class getClazz() {
return clazz;
}
@Override
protected void initNewInstance(PackageLoader.Package.Version newest) {
clazz = newest.getLoader().findClass(pluginInfo.className, Expressible.class);
}
}
public void handleRequestBody(SolrQueryRequest req, SolrQueryResponse rsp) throws Exception {
SolrParams params = req.getParams();
params = adjustParams(params);
@ -220,6 +251,10 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware,
private void handleAdmin(SolrQueryRequest req, SolrQueryResponse rsp, SolrParams params) {
String action = params.get("action").toLowerCase(Locale.ROOT).trim();
if ("plugins".equals(action)) {
rsp.add("plugins", (MapWriter) ew -> streamFactory.getFunctionNames().forEach((s, classSupplier) -> ew.putNoEx(s, classSupplier.get().getName())));
return;
}
if ("list".equals(action)) {
Collection<DaemonStream> vals = daemons.values();
@ -288,11 +323,14 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware,
return null;
}
public void close() {}
public void close() {
}
public void open() {}
public void open() {
}
public void setStreamContext(StreamContext context) {}
public void setStreamContext(StreamContext context) {
}
public List<TupleStream> children() {
return null;
@ -335,11 +373,14 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware,
return null;
}
public void close() {}
public void close() {
}
public void open() {}
public void open() {
}
public void setStreamContext(StreamContext context) {}
public void setStreamContext(StreamContext context) {
}
public List<TupleStream> children() {
return null;
@ -378,11 +419,14 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware,
return null;
}
public void close() {}
public void close() {
}
public void open() {}
public void open() {
}
public void setStreamContext(StreamContext context) {}
public void setStreamContext(StreamContext context) {
}
public List<TupleStream> children() {
return null;
@ -462,9 +506,9 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware,
}
}
private Map<String,List<String>> getCollectionShards(SolrParams params) {
private Map<String, List<String>> getCollectionShards(SolrParams params) {
Map<String,List<String>> collectionShards = new HashMap();
Map<String, List<String>> collectionShards = new HashMap();
Iterator<String> paramsIt = params.getParameterNamesIterator();
while (paramsIt.hasNext()) {
String param = paramsIt.next();

View File

@ -104,12 +104,17 @@ public class PackagePluginHolder<T> extends PluginBag.PluginHolder<T> {
log.info("loading plugin: {} -> {} using package {}:{}",
pluginInfo.type, pluginInfo.name, pkg.name(), newest.getVersion());
initNewInstance(newest);
pkgVersion = newest;
}
protected void initNewInstance(PackageLoader.Package.Version newest) {
Object instance = SolrCore.createInstance(pluginInfo.className,
pluginMeta.clazz, pluginMeta.getCleanTag(), core, newest.getLoader());
PluginBag.initInstance(instance, pluginInfo);
T old = inst;
inst = (T) instance;
pkgVersion = newest;
if (old instanceof AutoCloseable) {
AutoCloseable closeable = (AutoCloseable) old;
try {

View File

@ -65,6 +65,10 @@ openssl dgst -sha1 -sign ../cryptokeys/priv_key512.pem testurp_v2.jar.bin | open
P/ptFXRvQMd4oKPvadSpd+A9ffwY3gcex5GVFVRy3df0/OF8XT5my8rQz7FZva+2ORbWxdXS8NKwNrbPVHLGXw==
openssl dgst -sha1 -sign ../cryptokeys/priv_key512.pem expressible.jar.bin | openssl enc -base64 | openssl enc -base64 | tr -d \\n | sed
ZOT11arAiPmPZYOHzqodiNnxO9pRyRozWZEBX8XGjU1/HJptFnZK+DI7eXnUtbNaMcbXE2Ze8hh4M/eGyhY8BQ==
====================sha512====================
openssl dgst -sha512 runtimelibs.jar.bin
@ -95,6 +99,11 @@ openssl dgst -sha512 testurp_v1.jar.bin
openssl dgst -sha512 testurp_v2.jar.bin
5c4c0c454a032916e48a1c14a0fecbd6658658a66aedec5168b7222f2e3c0c63fbe09637238a9325ce2e95a2c8521834397a97701ead46c681aa20c9fccb6654
openssl dgst -sha512 expressible.jar.bin
3474a1414c8329c71ef5db2d3eb6e870363bdd7224a836aab561dccf5e8bcee4974ac799e72398c7e0b0c01972bab1c7454c8a4e791a8865bb676c0440627388
=============sha256============================
openssl dgst -sha256 runtimelibs.jar.bin

View File

@ -92,6 +92,7 @@ public class TestPackages extends SolrCloudTestCase {
String FILE3 = "/mypkg/runtimelibs_v3.jar";
String URP1 = "/mypkg/testurpv1.jar";
String URP2 = "/mypkg/testurpv2.jar";
String EXPR1 = "/mypkg/expressible.jar";
String COLLECTION_NAME = "testPluginLoadingColl";
byte[] derFile = readFile("cryptokeys/pub_key512.der");
cluster.getZkClient().makePath("/keys/exe", true);
@ -102,10 +103,13 @@ public class TestPackages extends SolrCloudTestCase {
postFileAndWait(cluster, "runtimecode/testurp_v1.jar.bin", URP1,
"h6UmMzuPqu4hQFGLBMJh/6kDSEXpJlgLsQDXx0KuxXWkV5giilRP57K3towiJRh2J+rqihqIghNCi3YgzgUnWQ==");
postFileAndWait(cluster, "runtimecode/expressible.jar.bin", EXPR1,
"ZOT11arAiPmPZYOHzqodiNnxO9pRyRozWZEBX8XGjU1/HJptFnZK+DI7eXnUtbNaMcbXE2Ze8hh4M/eGyhY8BQ==");
Package.AddVersion add = new Package.AddVersion();
add.version = "1.0";
add.pkg = "mypkg";
add.files = Arrays.asList(new String[]{FILE1, URP1});
add.files = Arrays.asList(new String[]{FILE1, URP1, EXPR1});
V2Request req = new V2Request.Builder("/cluster/package")
.forceV2(true)
.withMethod(SolrRequest.METHOD.POST)
@ -134,7 +138,8 @@ public class TestPackages extends SolrCloudTestCase {
"'create-requesthandler' : { 'name' : '/runtime', 'class': 'mypkg:org.apache.solr.core.RuntimeLibReqHandler' }," +
"'create-searchcomponent' : { 'name' : 'get', 'class': 'mypkg:org.apache.solr.core.RuntimeLibSearchComponent' }," +
"'create-queryResponseWriter' : { 'name' : 'json1', 'class': 'mypkg:org.apache.solr.core.RuntimeLibResponseWriter' }" +
"'create-updateProcessor' : { 'name' : 'myurp', 'class': 'mypkg:org.apache.solr.update.TestVersionedURP' }" +
"'create-updateProcessor' : { 'name' : 'myurp', 'class': 'mypkg:org.apache.solr.update.TestVersionedURP' }," +
" create-expressible: {name: mincopy , class: 'mypkg:org.apache.solr.client.solrj.io.stream.metrics.MinCopyMetric'}" +
"}";
cluster.getSolrClient().request(new ConfigRequest(payload) {
@Override
@ -159,6 +164,20 @@ public class TestPackages extends SolrCloudTestCase {
COLLECTION_NAME, "updateProcessor", "myurp",
"mypkg", "1.0" );
verifyCmponent(cluster.getSolrClient(),
COLLECTION_NAME, "expressible", "mincopy",
"mypkg", "1.0" );
TestDistribPackageStore.assertResponseValues(10,
cluster.getSolrClient() ,
new GenericSolrRequest(SolrRequest.METHOD.GET,
"/stream", new MapSolrParams((Map) Utils.makeMap("collection", COLLECTION_NAME,
WT, JAVABIN,
"action", "plugins"
))), Utils.makeMap(
":plugins:mincopy", "org.apache.solr.client.solrj.io.stream.metrics.MinCopyMetric"
));
UpdateRequest ur = new UpdateRequest();
ur.add(new SolrInputDocument("id", "1"));
ur.setParam("processor", "myurp");
@ -192,7 +211,7 @@ public class TestPackages extends SolrCloudTestCase {
"P/ptFXRvQMd4oKPvadSpd+A9ffwY3gcex5GVFVRy3df0/OF8XT5my8rQz7FZva+2ORbWxdXS8NKwNrbPVHLGXw==");
//add the version using package API
add.version = "1.1";
add.files = Arrays.asList(new String[]{FILE2,URP2});
add.files = Arrays.asList(new String[]{FILE2,URP2, EXPR1});
req.process(cluster.getSolrClient());
verifyCmponent(cluster.getSolrClient(),
@ -222,7 +241,7 @@ public class TestPackages extends SolrCloudTestCase {
"a400n4T7FT+2gM0SC6+MfSOExjud8MkhTSFylhvwNjtWwUgKdPFn434Wv7Qc4QEqDVLhQoL3WqYtQmLPti0G4Q==");
add.version = "2.1";
add.files = Arrays.asList(new String[]{FILE3, URP2});
add.files = Arrays.asList(new String[]{FILE3, URP2, EXPR1});
req.process(cluster.getSolrClient());
//now let's verify that the classes are updated
@ -304,7 +323,7 @@ public class TestPackages extends SolrCloudTestCase {
}.process(cluster.getSolrClient()) ;
add.version = "2.1";
add.files = Arrays.asList(new String[]{FILE3, URP2});
add.files = Arrays.asList(new String[]{FILE3, URP2, EXPR1});
req.process(cluster.getSolrClient());
//the collections mypkg is set to use version 1.1
@ -368,6 +387,21 @@ public class TestPackages extends SolrCloudTestCase {
}
}
/* new V2Request.Builder("/c/"+COLLECTIONORALIAS+"/config").withMethod(SolrRequest.METHOD.POST)
.withPayload("{add-expressible: {name: mincopy , class: org.apache.solr.client.solrj.io.stream.metrics.MinCopyMetric}}")
.build().process(cluster.getSolrClient());
ModifiableSolrParams _params = new ModifiableSolrParams();
QueryRequest query = new QueryRequest(new MapSolrParams("action","plugins", "collection", COLLECTIONORALIAS, "wt", "javabin"));
query.setPath("/stream");
NamedList<Object> rsp = cluster.getSolrClient().request(query);
assertEquals("org.apache.solr.client.solrj.io.stream.metrics.MinCopyMetric", rsp._getStr("/plugins/mincopy", null));
_params = new ModifiableSolrParams();
query = new QueryRequest(new MapSolrParams("componentName","mincopy", "meta" ,"true", "collection", COLLECTIONORALIAS, "wt", "javabin"));
query.setPath("/config/expressible");
rsp = cluster.getSolrClient().request(query);
System.out.println();*/
private void executeReq(String uri, JettySolrRunner jetty, Utils.InputStreamConsumer parser, Map expected) throws Exception {
try(HttpSolrClient client = (HttpSolrClient) jetty.newClient()){
@ -390,7 +424,6 @@ public class TestPackages extends SolrCloudTestCase {
"componentName", componentName,
"meta", "true"));
String s = "queryResponseWriter";
GenericSolrRequest req1 = new GenericSolrRequest(SolrRequest.METHOD.GET,
"/config/" + componentType, params);
TestDistribPackageStore.assertResponseValues(10,

View File

@ -21,11 +21,13 @@ import java.io.Serializable;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Map.Entry;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.solr.client.solrj.io.comp.ComparatorOrder;
@ -44,7 +46,7 @@ import org.apache.solr.client.solrj.io.stream.metrics.Metric;
public class StreamFactory implements Serializable {
private transient HashMap<String,String> collectionZkHosts;
private transient HashMap<String,Class<? extends Expressible>> functionNames;
private transient HashMap<String,Supplier<Class<? extends Expressible>>> functionNames;
private transient String defaultZkHost;
private transient String defaultCollection;
@ -79,14 +81,20 @@ public class StreamFactory implements Serializable {
return null;
}
public Map<String,Class<? extends Expressible>> getFunctionNames(){
return functionNames;
public Map<String, Supplier<Class<? extends Expressible>>> getFunctionNames() {
return Collections.unmodifiableMap(functionNames);
}
public StreamFactory withFunctionName(String functionName, Class<? extends Expressible> clazz){
this.functionNames.put(functionName, () -> clazz);
return this;
}
public StreamFactory withFunctionName(String functionName, Supplier< Class<? extends Expressible>> clazz){
this.functionNames.put(functionName, clazz);
return this;
}
public StreamExpressionParameter getOperand(StreamExpression expression, int parameterIndex){
if(null == expression.getParameters() || parameterIndex >= expression.getParameters().size()){
return null;
@ -173,10 +181,11 @@ public class StreamFactory implements Serializable {
List<StreamExpression> allStreamExpressions = getExpressionOperands(expression);
parameterLoop:
for(StreamExpression streamExpression : allStreamExpressions){
if(functionNames.containsKey(streamExpression.getFunctionName())){
for(Class clazz : clazzes){
if(!clazz.isAssignableFrom(functionNames.get(streamExpression.getFunctionName()))){
for(StreamExpression streamExpression : allStreamExpressions) {
Supplier<Class<? extends Expressible>> classSupplier = functionNames.get(streamExpression.getFunctionName());
if (classSupplier != null) {
for (Class clazz : clazzes) {
if (!clazz.isAssignableFrom(classSupplier.get())) {
continue parameterLoop;
}
}
@ -189,9 +198,10 @@ public class StreamFactory implements Serializable {
}
public boolean doesRepresentTypes(StreamExpression expression, Class ... clazzes){
if(functionNames.containsKey(expression.getFunctionName())){
Supplier<Class<? extends Expressible>> classSupplier = functionNames.get(expression.getFunctionName());
if(classSupplier != null){
for(Class clazz : clazzes){
if(!clazz.isAssignableFrom(functionNames.get(expression.getFunctionName()))){
if(!clazz.isAssignableFrom(classSupplier.get())){
return false;
}
}
@ -241,10 +251,12 @@ public class StreamFactory implements Serializable {
}
public TupleStream constructStream(StreamExpression expression) throws IOException{
String function = expression.getFunctionName();
if(functionNames.containsKey(function)){
Class<? extends Expressible> clazz = functionNames.get(function);
Supplier<Class<? extends Expressible>> classSupplier = functionNames.get(function);
if(classSupplier != null){
Class<? extends Expressible> clazz = classSupplier.get();
if(Expressible.class.isAssignableFrom(clazz) && TupleStream.class.isAssignableFrom(clazz)){
return (TupleStream)createInstance(functionNames.get(function), new Class[]{ StreamExpression.class, StreamFactory.class }, new Object[]{ expression, this});
return (TupleStream)createInstance(clazz, new Class[]{ StreamExpression.class, StreamFactory.class }, new Object[]{ expression, this});
}
}
@ -256,10 +268,11 @@ public class StreamFactory implements Serializable {
}
public Metric constructMetric(StreamExpression expression) throws IOException{
String function = expression.getFunctionName();
if(functionNames.containsKey(function)){
Class<? extends Expressible> clazz = functionNames.get(function);
Supplier<Class<? extends Expressible>> classSupplier = functionNames.get(function);
if(classSupplier != null){
Class<? extends Expressible> clazz = classSupplier.get();
if(Expressible.class.isAssignableFrom(clazz) && Metric.class.isAssignableFrom(clazz)){
return (Metric)createInstance(functionNames.get(function), new Class[]{ StreamExpression.class, StreamFactory.class }, new Object[]{ expression, this});
return (Metric)createInstance(clazz, new Class[]{ StreamExpression.class, StreamFactory.class }, new Object[]{ expression, this});
}
}
@ -356,16 +369,18 @@ public class StreamFactory implements Serializable {
public Metric constructOperation(String expressionClause) throws IOException {
return constructMetric(StreamExpressionParser.parse(expressionClause));
}
public StreamOperation constructOperation(StreamExpression expression) throws IOException{
public StreamOperation constructOperation(StreamExpression expression) throws IOException {
String function = expression.getFunctionName();
if(functionNames.containsKey(function)){
Class<? extends Expressible> clazz = functionNames.get(function);
if(Expressible.class.isAssignableFrom(clazz) && StreamOperation.class.isAssignableFrom(clazz)){
return (StreamOperation)createInstance(functionNames.get(function), new Class[]{ StreamExpression.class, StreamFactory.class }, new Object[]{ expression, this});
Supplier<Class<? extends Expressible>> classSupplier = functionNames.get(function);
if (classSupplier != null) {
Class<? extends Expressible> clazz = classSupplier.get();
if (Expressible.class.isAssignableFrom(clazz) && StreamOperation.class.isAssignableFrom(clazz)) {
return (StreamOperation) createInstance(clazz, new Class[]{StreamExpression.class, StreamFactory.class}, new Object[]{expression, this});
}
}
throw new IOException(String.format(Locale.ROOT,"Invalid operation expression %s - function '%s' is unknown (not mapped to a valid StreamOperation)", expression, expression.getFunctionName()));
throw new IOException(String.format(Locale.ROOT, "Invalid operation expression %s - function '%s' is unknown (not mapped to a valid StreamOperation)", expression, expression.getFunctionName()));
}
public org.apache.solr.client.solrj.io.eval.StreamEvaluator constructEvaluator(String expressionClause) throws IOException {
@ -373,21 +388,24 @@ public class StreamFactory implements Serializable {
}
public org.apache.solr.client.solrj.io.eval.StreamEvaluator constructEvaluator(StreamExpression expression) throws IOException{
String function = expression.getFunctionName();
if(functionNames.containsKey(function)){
Class<? extends Expressible> clazz = functionNames.get(function);
Supplier<Class<? extends Expressible>> classSupplier = functionNames.get(function);
if(classSupplier != null){
Class<? extends Expressible> clazz = classSupplier.get();
if(Expressible.class.isAssignableFrom(clazz) && StreamEvaluator.class.isAssignableFrom(clazz)){
return (org.apache.solr.client.solrj.io.eval.StreamEvaluator)createInstance(functionNames.get(function), new Class[]{ StreamExpression.class, StreamFactory.class }, new Object[]{ expression, this});
return (org.apache.solr.client.solrj.io.eval.StreamEvaluator)createInstance(clazz, new Class[]{ StreamExpression.class, StreamFactory.class }, new Object[]{ expression, this});
}
}
throw new IOException(String.format(Locale.ROOT,"Invalid evaluator expression %s - function '%s' is unknown (not mapped to a valid StreamEvaluator)", expression, expression.getFunctionName()));
}
public boolean isStream(StreamExpression expression) throws IOException{
public boolean isStream(StreamExpression expression) throws IOException {
String function = expression.getFunctionName();
if(functionNames.containsKey(function)){
Class<? extends Expressible> clazz = functionNames.get(function);
if(Expressible.class.isAssignableFrom(clazz) && TupleStream.class.isAssignableFrom(clazz)){
Supplier<Class<? extends Expressible>> classSupplier = functionNames.get(function);
if (classSupplier != null) {
Class<? extends Expressible> clazz = classSupplier.get();
if (Expressible.class.isAssignableFrom(clazz) && TupleStream.class.isAssignableFrom(clazz)) {
return true;
}
}
@ -395,11 +413,12 @@ public class StreamFactory implements Serializable {
return false;
}
public boolean isEvaluator(StreamExpression expression) throws IOException{
public boolean isEvaluator(StreamExpression expression) throws IOException {
String function = expression.getFunctionName();
if(functionNames.containsKey(function)){
Class<? extends Expressible> clazz = functionNames.get(function);
if(Expressible.class.isAssignableFrom(clazz) && StreamEvaluator.class.isAssignableFrom(clazz)){
Supplier<Class<? extends Expressible>> classSupplier = functionNames.get(function);
if (classSupplier != null) {
Class<? extends Expressible> clazz = classSupplier.get();
if (Expressible.class.isAssignableFrom(clazz) && StreamEvaluator.class.isAssignableFrom(clazz)) {
return true;
}
}
@ -407,29 +426,29 @@ public class StreamFactory implements Serializable {
return false;
}
public <T> T createInstance(Class<T> clazz, Class<?>[] paramTypes, Object[] params) throws IOException{
public <T> T createInstance(Class<T> clazz, Class<?>[] paramTypes, Object[] params) throws IOException {
Constructor<T> ctor;
try {
ctor = clazz.getConstructor(paramTypes);
return ctor.newInstance(params);
} catch (NoSuchMethodException | SecurityException | InstantiationException | IllegalAccessException | IllegalArgumentException | InvocationTargetException e) {
if(null != e.getMessage()){
throw new IOException(String.format(Locale.ROOT,"Unable to construct instance of %s caused by %s", clazz.getName(), e.getMessage()),e);
}
else{
throw new IOException(String.format(Locale.ROOT,"Unable to construct instance of %s", clazz.getName()),e);
if (null != e.getMessage()) {
throw new IOException(String.format(Locale.ROOT, "Unable to construct instance of %s caused by %s", clazz.getName(), e.getMessage()), e);
} else {
throw new IOException(String.format(Locale.ROOT, "Unable to construct instance of %s", clazz.getName()), e);
}
}
}
public String getFunctionName(Class<? extends Expressible> clazz) throws IOException{
for(Entry<String,Class<? extends Expressible>> entry : functionNames.entrySet()){
if(entry.getValue() == clazz){
public String getFunctionName(Class<? extends Expressible> clazz) throws IOException {
for (Entry<String, Supplier<Class<? extends Expressible>>> entry : functionNames.entrySet()) {
if (entry.getValue().get() == clazz) {
return entry.getKey();
}
}
throw new IOException(String.format(Locale.ROOT, "Unable to find function name for class '%s'", clazz.getName()));
}

View File

@ -20,6 +20,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Supplier;
import org.apache.lucene.util.LuceneTestCase;
import org.apache.lucene.util.LuceneTestCase.Slow;
@ -94,7 +95,7 @@ public class TestLang extends SolrTestCase {
}
StreamFactory factory = new StreamFactory();
Lang.register(factory);
Map<String,Class<? extends Expressible>> registeredFunctions = factory.getFunctionNames();
Map<String, Supplier<Class<? extends Expressible>>> registeredFunctions = factory.getFunctionNames();
//Check that each function that is expected is registered.
for(String func : functions) {