SOLR-14125 : Streaming expressions to be loadable from packages

This commit is contained in:
noble 2019-12-23 15:30:11 +11:00
parent f74a62f37e
commit 04b0a5d8f3
10 changed files with 213 additions and 92 deletions

View File

@ -47,6 +47,8 @@ Improvements
--------------------- ---------------------
* SOLR-14042: Fix varargs precommit warnings (Andraas Salamon via Jason Gerlowski) * SOLR-14042: Fix varargs precommit warnings (Andraas Salamon via Jason Gerlowski)
* SOLR-14125: Make <expressible> plugins work with packages (noble)
Optimizations Optimizations
--------------------- ---------------------
(No changes) (No changes)

View File

@ -19,6 +19,7 @@ package org.apache.solr.handler;
import java.io.IOException; import java.io.IOException;
import java.lang.invoke.MethodHandles; import java.lang.invoke.MethodHandles;
import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -27,7 +28,9 @@ import java.util.Map.Entry;
import org.apache.solr.client.solrj.io.Tuple; import org.apache.solr.client.solrj.io.Tuple;
import org.apache.solr.client.solrj.io.comp.StreamComparator; import org.apache.solr.client.solrj.io.comp.StreamComparator;
import org.apache.solr.client.solrj.io.graph.Traversal; import org.apache.solr.client.solrj.io.graph.Traversal;
import org.apache.solr.client.solrj.io.stream.*; import org.apache.solr.client.solrj.io.stream.ExceptionStream;
import org.apache.solr.client.solrj.io.stream.StreamContext;
import org.apache.solr.client.solrj.io.stream.TupleStream;
import org.apache.solr.client.solrj.io.stream.expr.DefaultStreamFactory; import org.apache.solr.client.solrj.io.stream.expr.DefaultStreamFactory;
import org.apache.solr.client.solrj.io.stream.expr.Explanation; import org.apache.solr.client.solrj.io.stream.expr.Explanation;
import org.apache.solr.client.solrj.io.stream.expr.Expressible; import org.apache.solr.client.solrj.io.stream.expr.Expressible;
@ -38,6 +41,8 @@ import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.SolrParams; import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.util.NamedList; import org.apache.solr.common.util.NamedList;
import org.apache.solr.core.CoreContainer; import org.apache.solr.core.CoreContainer;
import org.apache.solr.core.PluginInfo;
import org.apache.solr.core.SolrConfig;
import org.apache.solr.core.SolrCore; import org.apache.solr.core.SolrCore;
import org.apache.solr.request.SolrQueryRequest; import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.response.SolrQueryResponse; import org.apache.solr.response.SolrQueryResponse;
@ -90,11 +95,21 @@ public class GraphHandler extends RequestHandlerBase implements SolrCoreAware, P
Object functionMappingsObj = initArgs.get("streamFunctions"); Object functionMappingsObj = initArgs.get("streamFunctions");
if(null != functionMappingsObj){ if(null != functionMappingsObj){
NamedList<?> functionMappings = (NamedList<?>)functionMappingsObj; NamedList<?> functionMappings = (NamedList<?>)functionMappingsObj;
for(Entry<String,?> functionMapping : functionMappings){ for(Entry<String,?> functionMapping : functionMappings) {
Class<? extends Expressible> clazz = core.getResourceLoader().findClass((String)functionMapping.getValue(), String key = functionMapping.getKey();
PluginInfo pluginInfo = new PluginInfo(key, Collections.singletonMap("class", functionMapping.getValue()));
if (pluginInfo.pkgName == null) {
Class<? extends Expressible> clazz = core.getResourceLoader().findClass((String) functionMapping.getValue(),
Expressible.class); Expressible.class);
streamFactory.withFunctionName(functionMapping.getKey(), clazz); streamFactory.withFunctionName(key, clazz);
} else {
StreamHandler.ExpressibleHolder holder = new StreamHandler.ExpressibleHolder(pluginInfo, core, SolrConfig.classVsSolrPluginInfo.get(Expressible.class));
streamFactory.withFunctionName(key, () -> holder.getClazz());
} }
}
} }
} }

View File

@ -45,6 +45,7 @@ import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.SolrRequest; import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.SolrResponse; import org.apache.solr.client.solrj.SolrResponse;
import org.apache.solr.client.solrj.impl.HttpSolrClient; import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.client.solrj.io.stream.expr.Expressible;
import org.apache.solr.cloud.ZkController; import org.apache.solr.cloud.ZkController;
import org.apache.solr.cloud.ZkSolrResourceLoader; import org.apache.solr.cloud.ZkSolrResourceLoader;
import org.apache.solr.common.SolrException; import org.apache.solr.common.SolrException;
@ -588,9 +589,17 @@ public class SolrConfigHandler extends RequestHandlerBase implements SolrCoreAwa
private boolean verifyClass(CommandOperation op, String clz, Class expected) { private boolean verifyClass(CommandOperation op, String clz, Class expected) {
if (clz == null) return true; if (clz == null) return true;
if (!"true".equals(String.valueOf(op.getStr("runtimeLib", null)))) { if (!"true".equals(String.valueOf(op.getStr("runtimeLib", null)))) {
PluginInfo info = new PluginInfo(SolrRequestHandler.TYPE, op.getDataMap());
//this is not dynamically loaded so we can verify the class right away //this is not dynamically loaded so we can verify the class right away
try { try {
req.getCore().createInitInstance(new PluginInfo(SolrRequestHandler.TYPE, op.getDataMap()), expected, clz, ""); if(expected == Expressible.class) {
SolrResourceLoader resourceLoader = info.pkgName == null ?
req.getCore().getResourceLoader() :
req.getCore().getResourceLoader(info.pkgName);
resourceLoader.findClass(info.className, expected);
} else {
req.getCore().createInitInstance(info, expected, clz, "");
}
} catch (Exception e) { } catch (Exception e) {
log.error("Error checking plugin : ", e); log.error("Error checking plugin : ", e);
op.addError(e.getMessage()); op.addError(e.getMessage());

View File

@ -33,7 +33,10 @@ import org.apache.solr.client.solrj.io.ModelCache;
import org.apache.solr.client.solrj.io.SolrClientCache; import org.apache.solr.client.solrj.io.SolrClientCache;
import org.apache.solr.client.solrj.io.Tuple; import org.apache.solr.client.solrj.io.Tuple;
import org.apache.solr.client.solrj.io.comp.StreamComparator; import org.apache.solr.client.solrj.io.comp.StreamComparator;
import org.apache.solr.client.solrj.io.stream.*; import org.apache.solr.client.solrj.io.stream.DaemonStream;
import org.apache.solr.client.solrj.io.stream.ExceptionStream;
import org.apache.solr.client.solrj.io.stream.StreamContext;
import org.apache.solr.client.solrj.io.stream.TupleStream;
import org.apache.solr.client.solrj.io.stream.expr.Explanation; import org.apache.solr.client.solrj.io.stream.expr.Explanation;
import org.apache.solr.client.solrj.io.stream.expr.Explanation.ExpressionType; import org.apache.solr.client.solrj.io.stream.expr.Explanation.ExpressionType;
import org.apache.solr.client.solrj.io.stream.expr.Expressible; import org.apache.solr.client.solrj.io.stream.expr.Expressible;
@ -44,6 +47,7 @@ import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParser;
import org.apache.solr.client.solrj.io.stream.expr.StreamFactory; import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
import org.apache.solr.client.solrj.routing.RequestReplicaListTransformerGenerator; import org.apache.solr.client.solrj.routing.RequestReplicaListTransformerGenerator;
import org.apache.solr.cloud.ZkController; import org.apache.solr.cloud.ZkController;
import org.apache.solr.common.MapWriter;
import org.apache.solr.common.SolrException; import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.ZkStateReader; import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.CommonParams; import org.apache.solr.common.params.CommonParams;
@ -52,7 +56,10 @@ import org.apache.solr.common.params.SolrParams;
import org.apache.solr.core.CloseHook; import org.apache.solr.core.CloseHook;
import org.apache.solr.core.CoreContainer; import org.apache.solr.core.CoreContainer;
import org.apache.solr.core.PluginInfo; import org.apache.solr.core.PluginInfo;
import org.apache.solr.core.SolrConfig;
import org.apache.solr.core.SolrCore; import org.apache.solr.core.SolrCore;
import org.apache.solr.pkg.PackageLoader;
import org.apache.solr.pkg.PackagePluginHolder;
import org.apache.solr.request.SolrQueryRequest; import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.response.SolrQueryResponse; import org.apache.solr.response.SolrQueryResponse;
import org.apache.solr.security.AuthorizationContext; import org.apache.solr.security.AuthorizationContext;
@ -87,7 +94,7 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware,
private SolrDefaultStreamFactory streamFactory = new SolrDefaultStreamFactory(); private SolrDefaultStreamFactory streamFactory = new SolrDefaultStreamFactory();
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private String coreName; private String coreName;
private Map<String,DaemonStream> daemons = Collections.synchronizedMap(new HashMap()); private Map<String, DaemonStream> daemons = Collections.synchronizedMap(new HashMap());
@Override @Override
public PermissionNameProvider.Name getPermissionName(AuthorizationContext request) { public PermissionNameProvider.Name getPermissionName(AuthorizationContext request) {
@ -118,9 +125,15 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware,
// This pulls all the overrides and additions from the config // This pulls all the overrides and additions from the config
List<PluginInfo> pluginInfos = core.getSolrConfig().getPluginInfos(Expressible.class.getName()); List<PluginInfo> pluginInfos = core.getSolrConfig().getPluginInfos(Expressible.class.getName());
for (PluginInfo pluginInfo : pluginInfos) { for (PluginInfo pluginInfo : pluginInfos) {
if (pluginInfo.pkgName != null) {
ExpressibleHolder holder = new ExpressibleHolder(pluginInfo, core, SolrConfig.classVsSolrPluginInfo.get(Expressible.class));
streamFactory.withFunctionName(pluginInfo.name,
() -> holder.getClazz());
} else {
Class<? extends Expressible> clazz = core.getMemClassLoader().findClass(pluginInfo.className, Expressible.class); Class<? extends Expressible> clazz = core.getMemClassLoader().findClass(pluginInfo.className, Expressible.class);
streamFactory.withFunctionName(pluginInfo.name, clazz); streamFactory.withFunctionName(pluginInfo.name, clazz);
} }
}
core.addCloseHook(new CloseHook() { core.addCloseHook(new CloseHook() {
@Override @Override
@ -135,6 +148,24 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware,
}); });
} }
public static class ExpressibleHolder extends PackagePluginHolder {
private Class clazz;
public ExpressibleHolder(PluginInfo info, SolrCore core, SolrConfig.SolrPluginInfo pluginMeta) {
super(info, core, pluginMeta);
}
public Class getClazz() {
return clazz;
}
@Override
protected void initNewInstance(PackageLoader.Package.Version newest) {
clazz = newest.getLoader().findClass(pluginInfo.className, Expressible.class);
}
}
public void handleRequestBody(SolrQueryRequest req, SolrQueryResponse rsp) throws Exception { public void handleRequestBody(SolrQueryRequest req, SolrQueryResponse rsp) throws Exception {
SolrParams params = req.getParams(); SolrParams params = req.getParams();
params = adjustParams(params); params = adjustParams(params);
@ -218,6 +249,10 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware,
private void handleAdmin(SolrQueryRequest req, SolrQueryResponse rsp, SolrParams params) { private void handleAdmin(SolrQueryRequest req, SolrQueryResponse rsp, SolrParams params) {
String action = params.get("action").toLowerCase(Locale.ROOT).trim(); String action = params.get("action").toLowerCase(Locale.ROOT).trim();
if ("plugins".equals(action)) {
rsp.add("plugins", (MapWriter) ew -> streamFactory.getFunctionNames().forEach((s, classSupplier) -> ew.putNoEx(s, classSupplier.get().getName())));
return;
}
if ("list".equals(action)) { if ("list".equals(action)) {
Collection<DaemonStream> vals = daemons.values(); Collection<DaemonStream> vals = daemons.values();
@ -286,11 +321,14 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware,
return null; return null;
} }
public void close() {} public void close() {
}
public void open() {} public void open() {
}
public void setStreamContext(StreamContext context) {} public void setStreamContext(StreamContext context) {
}
public List<TupleStream> children() { public List<TupleStream> children() {
return null; return null;
@ -333,11 +371,14 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware,
return null; return null;
} }
public void close() {} public void close() {
}
public void open() {} public void open() {
}
public void setStreamContext(StreamContext context) {} public void setStreamContext(StreamContext context) {
}
public List<TupleStream> children() { public List<TupleStream> children() {
return null; return null;
@ -376,11 +417,14 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware,
return null; return null;
} }
public void close() {} public void close() {
}
public void open() {} public void open() {
}
public void setStreamContext(StreamContext context) {} public void setStreamContext(StreamContext context) {
}
public List<TupleStream> children() { public List<TupleStream> children() {
return null; return null;
@ -460,9 +504,9 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware,
} }
} }
private Map<String,List<String>> getCollectionShards(SolrParams params) { private Map<String, List<String>> getCollectionShards(SolrParams params) {
Map<String,List<String>> collectionShards = new HashMap(); Map<String, List<String>> collectionShards = new HashMap();
Iterator<String> paramsIt = params.getParameterNamesIterator(); Iterator<String> paramsIt = params.getParameterNamesIterator();
while (paramsIt.hasNext()) { while (paramsIt.hasNext()) {
String param = paramsIt.next(); String param = paramsIt.next();

View File

@ -104,12 +104,17 @@ public class PackagePluginHolder<T> extends PluginBag.PluginHolder<T> {
log.info("loading plugin: {} -> {} using package {}:{}", log.info("loading plugin: {} -> {} using package {}:{}",
pluginInfo.type, pluginInfo.name, pkg.name(), newest.getVersion()); pluginInfo.type, pluginInfo.name, pkg.name(), newest.getVersion());
initNewInstance(newest);
pkgVersion = newest;
}
protected void initNewInstance(PackageLoader.Package.Version newest) {
Object instance = SolrCore.createInstance(pluginInfo.className, Object instance = SolrCore.createInstance(pluginInfo.className,
pluginMeta.clazz, pluginMeta.getCleanTag(), core, newest.getLoader()); pluginMeta.clazz, pluginMeta.getCleanTag(), core, newest.getLoader());
PluginBag.initInstance(instance, pluginInfo); PluginBag.initInstance(instance, pluginInfo);
T old = inst; T old = inst;
inst = (T) instance; inst = (T) instance;
pkgVersion = newest;
if (old instanceof AutoCloseable) { if (old instanceof AutoCloseable) {
AutoCloseable closeable = (AutoCloseable) old; AutoCloseable closeable = (AutoCloseable) old;
try { try {

View File

@ -65,6 +65,10 @@ openssl dgst -sha1 -sign ../cryptokeys/priv_key512.pem testurp_v2.jar.bin | open
P/ptFXRvQMd4oKPvadSpd+A9ffwY3gcex5GVFVRy3df0/OF8XT5my8rQz7FZva+2ORbWxdXS8NKwNrbPVHLGXw== P/ptFXRvQMd4oKPvadSpd+A9ffwY3gcex5GVFVRy3df0/OF8XT5my8rQz7FZva+2ORbWxdXS8NKwNrbPVHLGXw==
openssl dgst -sha1 -sign ../cryptokeys/priv_key512.pem expressible.jar.bin | openssl enc -base64 | openssl enc -base64 | tr -d \\n | sed
ZOT11arAiPmPZYOHzqodiNnxO9pRyRozWZEBX8XGjU1/HJptFnZK+DI7eXnUtbNaMcbXE2Ze8hh4M/eGyhY8BQ==
====================sha512==================== ====================sha512====================
openssl dgst -sha512 runtimelibs.jar.bin openssl dgst -sha512 runtimelibs.jar.bin
@ -95,6 +99,11 @@ openssl dgst -sha512 testurp_v1.jar.bin
openssl dgst -sha512 testurp_v2.jar.bin openssl dgst -sha512 testurp_v2.jar.bin
5c4c0c454a032916e48a1c14a0fecbd6658658a66aedec5168b7222f2e3c0c63fbe09637238a9325ce2e95a2c8521834397a97701ead46c681aa20c9fccb6654 5c4c0c454a032916e48a1c14a0fecbd6658658a66aedec5168b7222f2e3c0c63fbe09637238a9325ce2e95a2c8521834397a97701ead46c681aa20c9fccb6654
openssl dgst -sha512 expressible.jar.bin
3474a1414c8329c71ef5db2d3eb6e870363bdd7224a836aab561dccf5e8bcee4974ac799e72398c7e0b0c01972bab1c7454c8a4e791a8865bb676c0440627388
=============sha256============================ =============sha256============================
openssl dgst -sha256 runtimelibs.jar.bin openssl dgst -sha256 runtimelibs.jar.bin

View File

@ -92,6 +92,7 @@ public class TestPackages extends SolrCloudTestCase {
String FILE3 = "/mypkg/runtimelibs_v3.jar"; String FILE3 = "/mypkg/runtimelibs_v3.jar";
String URP1 = "/mypkg/testurpv1.jar"; String URP1 = "/mypkg/testurpv1.jar";
String URP2 = "/mypkg/testurpv2.jar"; String URP2 = "/mypkg/testurpv2.jar";
String EXPR1 = "/mypkg/expressible.jar";
String COLLECTION_NAME = "testPluginLoadingColl"; String COLLECTION_NAME = "testPluginLoadingColl";
byte[] derFile = readFile("cryptokeys/pub_key512.der"); byte[] derFile = readFile("cryptokeys/pub_key512.der");
cluster.getZkClient().makePath("/keys/exe", true); cluster.getZkClient().makePath("/keys/exe", true);
@ -102,10 +103,13 @@ public class TestPackages extends SolrCloudTestCase {
postFileAndWait(cluster, "runtimecode/testurp_v1.jar.bin", URP1, postFileAndWait(cluster, "runtimecode/testurp_v1.jar.bin", URP1,
"h6UmMzuPqu4hQFGLBMJh/6kDSEXpJlgLsQDXx0KuxXWkV5giilRP57K3towiJRh2J+rqihqIghNCi3YgzgUnWQ=="); "h6UmMzuPqu4hQFGLBMJh/6kDSEXpJlgLsQDXx0KuxXWkV5giilRP57K3towiJRh2J+rqihqIghNCi3YgzgUnWQ==");
postFileAndWait(cluster, "runtimecode/expressible.jar.bin", EXPR1,
"ZOT11arAiPmPZYOHzqodiNnxO9pRyRozWZEBX8XGjU1/HJptFnZK+DI7eXnUtbNaMcbXE2Ze8hh4M/eGyhY8BQ==");
Package.AddVersion add = new Package.AddVersion(); Package.AddVersion add = new Package.AddVersion();
add.version = "1.0"; add.version = "1.0";
add.pkg = "mypkg"; add.pkg = "mypkg";
add.files = Arrays.asList(new String[]{FILE1, URP1}); add.files = Arrays.asList(new String[]{FILE1, URP1, EXPR1});
V2Request req = new V2Request.Builder("/cluster/package") V2Request req = new V2Request.Builder("/cluster/package")
.forceV2(true) .forceV2(true)
.withMethod(SolrRequest.METHOD.POST) .withMethod(SolrRequest.METHOD.POST)
@ -134,7 +138,8 @@ public class TestPackages extends SolrCloudTestCase {
"'create-requesthandler' : { 'name' : '/runtime', 'class': 'mypkg:org.apache.solr.core.RuntimeLibReqHandler' }," + "'create-requesthandler' : { 'name' : '/runtime', 'class': 'mypkg:org.apache.solr.core.RuntimeLibReqHandler' }," +
"'create-searchcomponent' : { 'name' : 'get', 'class': 'mypkg:org.apache.solr.core.RuntimeLibSearchComponent' }," + "'create-searchcomponent' : { 'name' : 'get', 'class': 'mypkg:org.apache.solr.core.RuntimeLibSearchComponent' }," +
"'create-queryResponseWriter' : { 'name' : 'json1', 'class': 'mypkg:org.apache.solr.core.RuntimeLibResponseWriter' }" + "'create-queryResponseWriter' : { 'name' : 'json1', 'class': 'mypkg:org.apache.solr.core.RuntimeLibResponseWriter' }" +
"'create-updateProcessor' : { 'name' : 'myurp', 'class': 'mypkg:org.apache.solr.update.TestVersionedURP' }" + "'create-updateProcessor' : { 'name' : 'myurp', 'class': 'mypkg:org.apache.solr.update.TestVersionedURP' }," +
" create-expressible: {name: mincopy , class: 'mypkg:org.apache.solr.client.solrj.io.stream.metrics.MinCopyMetric'}" +
"}"; "}";
cluster.getSolrClient().request(new ConfigRequest(payload) { cluster.getSolrClient().request(new ConfigRequest(payload) {
@Override @Override
@ -159,6 +164,20 @@ public class TestPackages extends SolrCloudTestCase {
COLLECTION_NAME, "updateProcessor", "myurp", COLLECTION_NAME, "updateProcessor", "myurp",
"mypkg", "1.0" ); "mypkg", "1.0" );
verifyCmponent(cluster.getSolrClient(),
COLLECTION_NAME, "expressible", "mincopy",
"mypkg", "1.0" );
TestDistribPackageStore.assertResponseValues(10,
cluster.getSolrClient() ,
new GenericSolrRequest(SolrRequest.METHOD.GET,
"/stream", new MapSolrParams((Map) Utils.makeMap("collection", COLLECTION_NAME,
WT, JAVABIN,
"action", "plugins"
))), Utils.makeMap(
":plugins:mincopy", "org.apache.solr.client.solrj.io.stream.metrics.MinCopyMetric"
));
UpdateRequest ur = new UpdateRequest(); UpdateRequest ur = new UpdateRequest();
ur.add(new SolrInputDocument("id", "1")); ur.add(new SolrInputDocument("id", "1"));
ur.setParam("processor", "myurp"); ur.setParam("processor", "myurp");
@ -192,7 +211,7 @@ public class TestPackages extends SolrCloudTestCase {
"P/ptFXRvQMd4oKPvadSpd+A9ffwY3gcex5GVFVRy3df0/OF8XT5my8rQz7FZva+2ORbWxdXS8NKwNrbPVHLGXw=="); "P/ptFXRvQMd4oKPvadSpd+A9ffwY3gcex5GVFVRy3df0/OF8XT5my8rQz7FZva+2ORbWxdXS8NKwNrbPVHLGXw==");
//add the version using package API //add the version using package API
add.version = "1.1"; add.version = "1.1";
add.files = Arrays.asList(new String[]{FILE2,URP2}); add.files = Arrays.asList(new String[]{FILE2,URP2, EXPR1});
req.process(cluster.getSolrClient()); req.process(cluster.getSolrClient());
verifyCmponent(cluster.getSolrClient(), verifyCmponent(cluster.getSolrClient(),
@ -222,7 +241,7 @@ public class TestPackages extends SolrCloudTestCase {
"a400n4T7FT+2gM0SC6+MfSOExjud8MkhTSFylhvwNjtWwUgKdPFn434Wv7Qc4QEqDVLhQoL3WqYtQmLPti0G4Q=="); "a400n4T7FT+2gM0SC6+MfSOExjud8MkhTSFylhvwNjtWwUgKdPFn434Wv7Qc4QEqDVLhQoL3WqYtQmLPti0G4Q==");
add.version = "2.1"; add.version = "2.1";
add.files = Arrays.asList(new String[]{FILE3, URP2}); add.files = Arrays.asList(new String[]{FILE3, URP2, EXPR1});
req.process(cluster.getSolrClient()); req.process(cluster.getSolrClient());
//now let's verify that the classes are updated //now let's verify that the classes are updated
@ -304,7 +323,7 @@ public class TestPackages extends SolrCloudTestCase {
}.process(cluster.getSolrClient()) ; }.process(cluster.getSolrClient()) ;
add.version = "2.1"; add.version = "2.1";
add.files = Arrays.asList(new String[]{FILE3, URP2}); add.files = Arrays.asList(new String[]{FILE3, URP2, EXPR1});
req.process(cluster.getSolrClient()); req.process(cluster.getSolrClient());
//the collections mypkg is set to use version 1.1 //the collections mypkg is set to use version 1.1
@ -368,7 +387,6 @@ public class TestPackages extends SolrCloudTestCase {
} }
} }
private void executeReq(String uri, JettySolrRunner jetty, Utils.InputStreamConsumer parser, Map expected) throws Exception { private void executeReq(String uri, JettySolrRunner jetty, Utils.InputStreamConsumer parser, Map expected) throws Exception {
try(HttpSolrClient client = (HttpSolrClient) jetty.newClient()){ try(HttpSolrClient client = (HttpSolrClient) jetty.newClient()){
TestDistribPackageStore.assertResponseValues(10, TestDistribPackageStore.assertResponseValues(10,
@ -390,7 +408,6 @@ public class TestPackages extends SolrCloudTestCase {
"componentName", componentName, "componentName", componentName,
"meta", "true")); "meta", "true"));
String s = "queryResponseWriter";
GenericSolrRequest req1 = new GenericSolrRequest(SolrRequest.METHOD.GET, GenericSolrRequest req1 = new GenericSolrRequest(SolrRequest.METHOD.GET,
"/config/" + componentType, params); "/config/" + componentType, params);
TestDistribPackageStore.assertResponseValues(10, TestDistribPackageStore.assertResponseValues(10,

View File

@ -21,11 +21,13 @@ import java.io.Serializable;
import java.lang.reflect.Constructor; import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException; import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Locale; import java.util.Locale;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.function.Supplier;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import org.apache.solr.client.solrj.io.comp.ComparatorOrder; import org.apache.solr.client.solrj.io.comp.ComparatorOrder;
@ -44,7 +46,7 @@ import org.apache.solr.client.solrj.io.stream.metrics.Metric;
public class StreamFactory implements Serializable { public class StreamFactory implements Serializable {
private transient HashMap<String,String> collectionZkHosts; private transient HashMap<String,String> collectionZkHosts;
private transient HashMap<String,Class<? extends Expressible>> functionNames; private transient HashMap<String,Supplier<Class<? extends Expressible>>> functionNames;
private transient String defaultZkHost; private transient String defaultZkHost;
private transient String defaultCollection; private transient String defaultCollection;
@ -79,14 +81,20 @@ public class StreamFactory implements Serializable {
return null; return null;
} }
public Map<String,Class<? extends Expressible>> getFunctionNames(){ public Map<String, Supplier<Class<? extends Expressible>>> getFunctionNames() {
return functionNames; return Collections.unmodifiableMap(functionNames);
} }
public StreamFactory withFunctionName(String functionName, Class<? extends Expressible> clazz){ public StreamFactory withFunctionName(String functionName, Class<? extends Expressible> clazz){
this.functionNames.put(functionName, () -> clazz);
return this;
}
public StreamFactory withFunctionName(String functionName, Supplier< Class<? extends Expressible>> clazz){
this.functionNames.put(functionName, clazz); this.functionNames.put(functionName, clazz);
return this; return this;
} }
public StreamExpressionParameter getOperand(StreamExpression expression, int parameterIndex){ public StreamExpressionParameter getOperand(StreamExpression expression, int parameterIndex){
if(null == expression.getParameters() || parameterIndex >= expression.getParameters().size()){ if(null == expression.getParameters() || parameterIndex >= expression.getParameters().size()){
return null; return null;
@ -173,10 +181,11 @@ public class StreamFactory implements Serializable {
List<StreamExpression> allStreamExpressions = getExpressionOperands(expression); List<StreamExpression> allStreamExpressions = getExpressionOperands(expression);
parameterLoop: parameterLoop:
for(StreamExpression streamExpression : allStreamExpressions){ for(StreamExpression streamExpression : allStreamExpressions) {
if(functionNames.containsKey(streamExpression.getFunctionName())){ Supplier<Class<? extends Expressible>> classSupplier = functionNames.get(streamExpression.getFunctionName());
for(Class clazz : clazzes){ if (classSupplier != null) {
if(!clazz.isAssignableFrom(functionNames.get(streamExpression.getFunctionName()))){ for (Class clazz : clazzes) {
if (!clazz.isAssignableFrom(classSupplier.get())) {
continue parameterLoop; continue parameterLoop;
} }
} }
@ -189,9 +198,10 @@ public class StreamFactory implements Serializable {
} }
public boolean doesRepresentTypes(StreamExpression expression, Class ... clazzes){ public boolean doesRepresentTypes(StreamExpression expression, Class ... clazzes){
if(functionNames.containsKey(expression.getFunctionName())){ Supplier<Class<? extends Expressible>> classSupplier = functionNames.get(expression.getFunctionName());
if(classSupplier != null){
for(Class clazz : clazzes){ for(Class clazz : clazzes){
if(!clazz.isAssignableFrom(functionNames.get(expression.getFunctionName()))){ if(!clazz.isAssignableFrom(classSupplier.get())){
return false; return false;
} }
} }
@ -241,10 +251,12 @@ public class StreamFactory implements Serializable {
} }
public TupleStream constructStream(StreamExpression expression) throws IOException{ public TupleStream constructStream(StreamExpression expression) throws IOException{
String function = expression.getFunctionName(); String function = expression.getFunctionName();
if(functionNames.containsKey(function)){ Supplier<Class<? extends Expressible>> classSupplier = functionNames.get(function);
Class<? extends Expressible> clazz = functionNames.get(function);
if(classSupplier != null){
Class<? extends Expressible> clazz = classSupplier.get();
if(Expressible.class.isAssignableFrom(clazz) && TupleStream.class.isAssignableFrom(clazz)){ if(Expressible.class.isAssignableFrom(clazz) && TupleStream.class.isAssignableFrom(clazz)){
return (TupleStream)createInstance(functionNames.get(function), new Class[]{ StreamExpression.class, StreamFactory.class }, new Object[]{ expression, this}); return (TupleStream)createInstance(clazz, new Class[]{ StreamExpression.class, StreamFactory.class }, new Object[]{ expression, this});
} }
} }
@ -256,10 +268,11 @@ public class StreamFactory implements Serializable {
} }
public Metric constructMetric(StreamExpression expression) throws IOException{ public Metric constructMetric(StreamExpression expression) throws IOException{
String function = expression.getFunctionName(); String function = expression.getFunctionName();
if(functionNames.containsKey(function)){ Supplier<Class<? extends Expressible>> classSupplier = functionNames.get(function);
Class<? extends Expressible> clazz = functionNames.get(function); if(classSupplier != null){
Class<? extends Expressible> clazz = classSupplier.get();
if(Expressible.class.isAssignableFrom(clazz) && Metric.class.isAssignableFrom(clazz)){ if(Expressible.class.isAssignableFrom(clazz) && Metric.class.isAssignableFrom(clazz)){
return (Metric)createInstance(functionNames.get(function), new Class[]{ StreamExpression.class, StreamFactory.class }, new Object[]{ expression, this}); return (Metric)createInstance(clazz, new Class[]{ StreamExpression.class, StreamFactory.class }, new Object[]{ expression, this});
} }
} }
@ -356,16 +369,18 @@ public class StreamFactory implements Serializable {
public Metric constructOperation(String expressionClause) throws IOException { public Metric constructOperation(String expressionClause) throws IOException {
return constructMetric(StreamExpressionParser.parse(expressionClause)); return constructMetric(StreamExpressionParser.parse(expressionClause));
} }
public StreamOperation constructOperation(StreamExpression expression) throws IOException{
public StreamOperation constructOperation(StreamExpression expression) throws IOException {
String function = expression.getFunctionName(); String function = expression.getFunctionName();
if(functionNames.containsKey(function)){ Supplier<Class<? extends Expressible>> classSupplier = functionNames.get(function);
Class<? extends Expressible> clazz = functionNames.get(function); if (classSupplier != null) {
if(Expressible.class.isAssignableFrom(clazz) && StreamOperation.class.isAssignableFrom(clazz)){ Class<? extends Expressible> clazz = classSupplier.get();
return (StreamOperation)createInstance(functionNames.get(function), new Class[]{ StreamExpression.class, StreamFactory.class }, new Object[]{ expression, this}); if (Expressible.class.isAssignableFrom(clazz) && StreamOperation.class.isAssignableFrom(clazz)) {
return (StreamOperation) createInstance(clazz, new Class[]{StreamExpression.class, StreamFactory.class}, new Object[]{expression, this});
} }
} }
throw new IOException(String.format(Locale.ROOT,"Invalid operation expression %s - function '%s' is unknown (not mapped to a valid StreamOperation)", expression, expression.getFunctionName())); throw new IOException(String.format(Locale.ROOT, "Invalid operation expression %s - function '%s' is unknown (not mapped to a valid StreamOperation)", expression, expression.getFunctionName()));
} }
public org.apache.solr.client.solrj.io.eval.StreamEvaluator constructEvaluator(String expressionClause) throws IOException { public org.apache.solr.client.solrj.io.eval.StreamEvaluator constructEvaluator(String expressionClause) throws IOException {
@ -373,21 +388,24 @@ public class StreamFactory implements Serializable {
} }
public org.apache.solr.client.solrj.io.eval.StreamEvaluator constructEvaluator(StreamExpression expression) throws IOException{ public org.apache.solr.client.solrj.io.eval.StreamEvaluator constructEvaluator(StreamExpression expression) throws IOException{
String function = expression.getFunctionName(); String function = expression.getFunctionName();
if(functionNames.containsKey(function)){ Supplier<Class<? extends Expressible>> classSupplier = functionNames.get(function);
Class<? extends Expressible> clazz = functionNames.get(function);
if(classSupplier != null){
Class<? extends Expressible> clazz = classSupplier.get();
if(Expressible.class.isAssignableFrom(clazz) && StreamEvaluator.class.isAssignableFrom(clazz)){ if(Expressible.class.isAssignableFrom(clazz) && StreamEvaluator.class.isAssignableFrom(clazz)){
return (org.apache.solr.client.solrj.io.eval.StreamEvaluator)createInstance(functionNames.get(function), new Class[]{ StreamExpression.class, StreamFactory.class }, new Object[]{ expression, this}); return (org.apache.solr.client.solrj.io.eval.StreamEvaluator)createInstance(clazz, new Class[]{ StreamExpression.class, StreamFactory.class }, new Object[]{ expression, this});
} }
} }
throw new IOException(String.format(Locale.ROOT,"Invalid evaluator expression %s - function '%s' is unknown (not mapped to a valid StreamEvaluator)", expression, expression.getFunctionName())); throw new IOException(String.format(Locale.ROOT,"Invalid evaluator expression %s - function '%s' is unknown (not mapped to a valid StreamEvaluator)", expression, expression.getFunctionName()));
} }
public boolean isStream(StreamExpression expression) throws IOException{ public boolean isStream(StreamExpression expression) throws IOException {
String function = expression.getFunctionName(); String function = expression.getFunctionName();
if(functionNames.containsKey(function)){ Supplier<Class<? extends Expressible>> classSupplier = functionNames.get(function);
Class<? extends Expressible> clazz = functionNames.get(function); if (classSupplier != null) {
if(Expressible.class.isAssignableFrom(clazz) && TupleStream.class.isAssignableFrom(clazz)){ Class<? extends Expressible> clazz = classSupplier.get();
if (Expressible.class.isAssignableFrom(clazz) && TupleStream.class.isAssignableFrom(clazz)) {
return true; return true;
} }
} }
@ -395,11 +413,12 @@ public class StreamFactory implements Serializable {
return false; return false;
} }
public boolean isEvaluator(StreamExpression expression) throws IOException{ public boolean isEvaluator(StreamExpression expression) throws IOException {
String function = expression.getFunctionName(); String function = expression.getFunctionName();
if(functionNames.containsKey(function)){ Supplier<Class<? extends Expressible>> classSupplier = functionNames.get(function);
Class<? extends Expressible> clazz = functionNames.get(function); if (classSupplier != null) {
if(Expressible.class.isAssignableFrom(clazz) && StreamEvaluator.class.isAssignableFrom(clazz)){ Class<? extends Expressible> clazz = classSupplier.get();
if (Expressible.class.isAssignableFrom(clazz) && StreamEvaluator.class.isAssignableFrom(clazz)) {
return true; return true;
} }
} }
@ -407,29 +426,29 @@ public class StreamFactory implements Serializable {
return false; return false;
} }
public <T> T createInstance(Class<T> clazz, Class<?>[] paramTypes, Object[] params) throws IOException{ public <T> T createInstance(Class<T> clazz, Class<?>[] paramTypes, Object[] params) throws IOException {
Constructor<T> ctor; Constructor<T> ctor;
try { try {
ctor = clazz.getConstructor(paramTypes); ctor = clazz.getConstructor(paramTypes);
return ctor.newInstance(params); return ctor.newInstance(params);
} catch (NoSuchMethodException | SecurityException | InstantiationException | IllegalAccessException | IllegalArgumentException | InvocationTargetException e) { } catch (NoSuchMethodException | SecurityException | InstantiationException | IllegalAccessException | IllegalArgumentException | InvocationTargetException e) {
if(null != e.getMessage()){ if (null != e.getMessage()) {
throw new IOException(String.format(Locale.ROOT,"Unable to construct instance of %s caused by %s", clazz.getName(), e.getMessage()),e); throw new IOException(String.format(Locale.ROOT, "Unable to construct instance of %s caused by %s", clazz.getName(), e.getMessage()), e);
} } else {
else{ throw new IOException(String.format(Locale.ROOT, "Unable to construct instance of %s", clazz.getName()), e);
throw new IOException(String.format(Locale.ROOT,"Unable to construct instance of %s", clazz.getName()),e);
} }
} }
} }
public String getFunctionName(Class<? extends Expressible> clazz) throws IOException{ public String getFunctionName(Class<? extends Expressible> clazz) throws IOException {
for(Entry<String,Class<? extends Expressible>> entry : functionNames.entrySet()){ for (Entry<String, Supplier<Class<? extends Expressible>>> entry : functionNames.entrySet()) {
if(entry.getValue() == clazz){ if (entry.getValue().get() == clazz) {
return entry.getKey(); return entry.getKey();
} }
} }
throw new IOException(String.format(Locale.ROOT, "Unable to find function name for class '%s'", clazz.getName())); throw new IOException(String.format(Locale.ROOT, "Unable to find function name for class '%s'", clazz.getName()));
} }

View File

@ -20,6 +20,7 @@ import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.function.Supplier;
import org.apache.lucene.util.LuceneTestCase; import org.apache.lucene.util.LuceneTestCase;
import org.apache.lucene.util.LuceneTestCase.Slow; import org.apache.lucene.util.LuceneTestCase.Slow;
@ -94,7 +95,7 @@ public class TestLang extends SolrTestCase {
} }
StreamFactory factory = new StreamFactory(); StreamFactory factory = new StreamFactory();
Lang.register(factory); Lang.register(factory);
Map<String,Class<? extends Expressible>> registeredFunctions = factory.getFunctionNames(); Map<String, Supplier<Class<? extends Expressible>>> registeredFunctions = factory.getFunctionNames();
//Check that each function that is expected is registered. //Check that each function that is expected is registered.
for(String func : functions) { for(String func : functions) {