SOLR-12687: Add functions to cache data structures and mathematical models

This commit is contained in:
Joel Bernstein 2018-08-22 22:47:04 -04:00
parent f26dd13b34
commit b72ff3babb
9 changed files with 352 additions and 2 deletions

View File

@ -25,6 +25,8 @@ import java.util.HashMap;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.solr.client.solrj.io.ModelCache; import org.apache.solr.client.solrj.io.ModelCache;
import org.apache.solr.client.solrj.io.SolrClientCache; import org.apache.solr.client.solrj.io.SolrClientCache;
@ -64,6 +66,7 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware,
static SolrClientCache clientCache = new SolrClientCache(); static SolrClientCache clientCache = new SolrClientCache();
static ModelCache modelCache = null; static ModelCache modelCache = null;
static ConcurrentMap objectCache = new ConcurrentHashMap();
private SolrDefaultStreamFactory streamFactory = new SolrDefaultStreamFactory(); private SolrDefaultStreamFactory streamFactory = new SolrDefaultStreamFactory();
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private String coreName; private String coreName;
@ -164,6 +167,7 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware,
context.numWorkers = numWorkers; context.numWorkers = numWorkers;
context.setSolrClientCache(clientCache); context.setSolrClientCache(clientCache);
context.setModelCache(modelCache); context.setModelCache(modelCache);
context.setObjectCache(objectCache);
context.put("core", this.coreName); context.put("core", this.coreName);
context.put("solr-core", req.getCore()); context.put("solr-core", req.getCore());
tupleStream.setStreamContext(context); tupleStream.setStreamContext(context);

View File

@ -250,7 +250,10 @@ public class Lang {
.withFunctionName("gaussfit", GaussFitEvaluator.class) .withFunctionName("gaussfit", GaussFitEvaluator.class)
.withFunctionName("outliers", OutliersEvaluator.class) .withFunctionName("outliers", OutliersEvaluator.class)
.withFunctionName("stream", GetStream.class) .withFunctionName("stream", GetStream.class)
.withFunctionName("putCache", PutCacheEvaluator.class)
.withFunctionName("getCache", GetCacheEvaluator.class)
.withFunctionName("removeCache", RemoveCacheEvaluator.class)
.withFunctionName("listCache", ListCacheEvaluator.class)
// Boolean Stream Evaluators // Boolean Stream Evaluators
.withFunctionName("and", AndEvaluator.class) .withFunctionName("and", AndEvaluator.class)

View File

@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.client.solrj.io.eval;
import java.io.IOException;
import java.util.Locale;
import java.util.concurrent.ConcurrentMap;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
public class GetCacheEvaluator extends RecursiveObjectEvaluator implements ManyValueWorker {
protected static final long serialVersionUID = 1L;
public GetCacheEvaluator(StreamExpression expression, StreamFactory factory) throws IOException{
super(expression, factory);
if(2 != containedEvaluators.size()){
throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - expecting exactly 3 values but found %d",expression,containedEvaluators.size()));
}
}
@Override
public Object doWork(Object... values) throws IOException {
ConcurrentMap objectCache = this.streamContext.getObjectCache();
if(values.length == 2) {
String space = (String)values[0];
String key = (String)values[1];
space = space.replace("\"", "");
key = key.replace("\"", "");
ConcurrentMap spaceCache = (ConcurrentMap)objectCache.get(space);
if(spaceCache != null) {
return spaceCache.get(key);
}
return null;
} else {
throw new IOException("The getCache function requires two parameters: workspace and key");
}
}
}

View File

@ -0,0 +1,73 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.client.solrj.io.eval;
import java.io.IOException;
import java.util.Enumeration;
import java.util.Locale;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.List;
import java.util.ArrayList;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
public class ListCacheEvaluator extends RecursiveObjectEvaluator implements ManyValueWorker {
protected static final long serialVersionUID = 1L;
public ListCacheEvaluator(StreamExpression expression, StreamFactory factory) throws IOException{
super(expression, factory);
if(containedEvaluators.size() > 1){
throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - expecting at most 1 values but found %d",expression,containedEvaluators.size()));
}
}
@Override
public Object doWork(Object... values) throws IOException {
ConcurrentMap objectCache = this.streamContext.getObjectCache();
List list = new ArrayList();
if(values.length == 0) {
ConcurrentHashMap m = (ConcurrentHashMap)objectCache;
Enumeration en = m.keys();
while(en.hasMoreElements()) {
list.add(en.nextElement());
}
return list;
} else if(values.length == 1) {
String space = (String)values[0];
space = space.replace("\"", "");
ConcurrentMap spaceCache = (ConcurrentMap)objectCache.get(space);
if(spaceCache != null) {
ConcurrentHashMap spaceMap = (ConcurrentHashMap)objectCache.get(space);
Enumeration en = spaceMap.keys();
while(en.hasMoreElements()) {
list.add(en.nextElement());
}
return list;
} else {
return list;
}
} else {
throw new IOException("The listCache function requires two parameters: workspace and key");
}
}
}

View File

@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.client.solrj.io.eval;
import java.io.IOException;
import java.util.Locale;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
public class PutCacheEvaluator extends RecursiveObjectEvaluator implements ManyValueWorker {
protected static final long serialVersionUID = 1L;
public PutCacheEvaluator(StreamExpression expression, StreamFactory factory) throws IOException{
super(expression, factory);
if(3 != containedEvaluators.size()){
throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - expecting exactly 3 values but found %d",expression,containedEvaluators.size()));
}
}
@Override
public Object doWork(Object... values) throws IOException {
ConcurrentMap objectCache = this.streamContext.getObjectCache();
if(values.length == 3) {
String space = (String)values[0];
String key = (String)values[1];
space = space.replace("\"", "");
key = key.replace("\"", "");
Object value = values[2];
ConcurrentMap spaceCache = (ConcurrentMap)objectCache.get(space);
if(spaceCache == null) {
spaceCache = new ConcurrentHashMap();
objectCache.put(space, spaceCache);
}
spaceCache.put(key, value);
return value;
} else {
throw new IOException("The putCache function requires three parameters: workspace, key and value");
}
}
}

View File

@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.client.solrj.io.eval;
import java.io.IOException;
import java.util.Locale;
import java.util.concurrent.ConcurrentMap;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
public class RemoveCacheEvaluator extends RecursiveObjectEvaluator implements ManyValueWorker {
protected static final long serialVersionUID = 1L;
public RemoveCacheEvaluator(StreamExpression expression, StreamFactory factory) throws IOException{
super(expression, factory);
if(2 != containedEvaluators.size()){
throw new IOException(String.format(Locale.ROOT,"Invalid expression %s - expecting exactly 3 values but found %d",expression,containedEvaluators.size()));
}
}
@Override
public Object doWork(Object... values) throws IOException {
ConcurrentMap objectCache = this.streamContext.getObjectCache();
if(values.length == 2) {
String space = (String)values[0];
String key = (String)values[1];
space = space.replace("\"", "");
key = key.replace("\"", "");
ConcurrentMap spaceCache = (ConcurrentMap)objectCache.get(space);
if(spaceCache != null) {
return spaceCache.remove(key);
}
return false;
} else {
throw new IOException("The removeCache function requires two parameters: workspace and key");
}
}
}

View File

@ -19,6 +19,7 @@ package org.apache.solr.client.solrj.io.stream;
import java.io.Serializable; import java.io.Serializable;
import java.util.Map; import java.util.Map;
import java.util.HashMap; import java.util.HashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.solr.client.solrj.io.ModelCache; import org.apache.solr.client.solrj.io.ModelCache;
import org.apache.solr.client.solrj.io.SolrClientCache; import org.apache.solr.client.solrj.io.SolrClientCache;
@ -38,12 +39,21 @@ public class StreamContext implements Serializable{
private Map entries = new HashMap(); private Map entries = new HashMap();
private Map tupleContext = new HashMap(); private Map tupleContext = new HashMap();
private Map<String, Object> lets = new HashMap(); private Map<String, Object> lets = new HashMap();
private ConcurrentMap objectCache;
public int workerID; public int workerID;
public int numWorkers; public int numWorkers;
private SolrClientCache clientCache; private SolrClientCache clientCache;
private ModelCache modelCache; private ModelCache modelCache;
private StreamFactory streamFactory; private StreamFactory streamFactory;
public ConcurrentMap getObjectCache() {
return this.objectCache;
}
public void setObjectCache(ConcurrentMap objectCache) {
this.objectCache = objectCache;
}
public Map<String, Object> getLets(){ public Map<String, Object> getLets(){
return lets; return lets;
} }

View File

@ -70,7 +70,7 @@ public class TestLang extends LuceneTestCase {
"mod", "ceil", "floor", "sin", "asin", "sinh", "cos", "acos", "cosh", "tan", "atan", "tanh", "round", "sqrt", "mod", "ceil", "floor", "sin", "asin", "sinh", "cos", "acos", "cosh", "tan", "atan", "tanh", "round", "sqrt",
"cbrt", "coalesce", "uuid", "if", "convert", "valueAt", "memset", "fft", "ifft", "euclidean","manhattan", "cbrt", "coalesce", "uuid", "if", "convert", "valueAt", "memset", "fft", "ifft", "euclidean","manhattan",
"earthMovers", "canberra", "chebyshev", "ones", "zeros", "setValue", "getValue", "knnRegress", "gaussfit", "earthMovers", "canberra", "chebyshev", "ones", "zeros", "setValue", "getValue", "knnRegress", "gaussfit",
"outliers", "stream"}; "outliers", "stream", "getCache", "putCache", "listCache", "removeCache"};
@Test @Test
public void testLang() { public void testLang() {

View File

@ -2728,6 +2728,91 @@ public class MathExpressionTest extends SolrCloudTestCase {
assertTrue(tuples.size() == 1); assertTrue(tuples.size() == 1);
Number dotProduct = (Number)tuples.get(0).get("return-value"); Number dotProduct = (Number)tuples.get(0).get("return-value");
assertTrue(dotProduct.doubleValue() == 182); assertTrue(dotProduct.doubleValue() == 182);
}
@Test
public void testCache() throws Exception {
String cexpr = "putCache(space1, key1, dotProduct(array(2,4,6,8,10,12),array(1,2,3,4,5,6)))";
ModifiableSolrParams paramsLoc = new ModifiableSolrParams();
paramsLoc.set("expr", cexpr);
paramsLoc.set("qt", "/stream");
String url = cluster.getJettySolrRunners().get(0).getBaseUrl().toString()+"/"+COLLECTIONORALIAS;
TupleStream solrStream = new SolrStream(url, paramsLoc);
StreamContext context = new StreamContext();
solrStream.setStreamContext(context);
List<Tuple> tuples = getTuples(solrStream);
assertTrue(tuples.size() == 1);
Number dotProduct = (Number)tuples.get(0).get("return-value");
assertTrue(dotProduct.doubleValue() == 182);
cexpr = "getCache(space1, key1)";
paramsLoc = new ModifiableSolrParams();
paramsLoc.set("expr", cexpr);
paramsLoc.set("qt", "/stream");
solrStream = new SolrStream(url, paramsLoc);
context = new StreamContext();
solrStream.setStreamContext(context);
tuples = getTuples(solrStream);
assertTrue(tuples.size() == 1);
dotProduct = (Number)tuples.get(0).get("return-value");
assertTrue(dotProduct.doubleValue() == 182);
cexpr = "listCache(space1)";
paramsLoc = new ModifiableSolrParams();
paramsLoc.set("expr", cexpr);
paramsLoc.set("qt", "/stream");
solrStream = new SolrStream(url, paramsLoc);
context = new StreamContext();
solrStream.setStreamContext(context);
tuples = getTuples(solrStream);
assertTrue(tuples.size() == 1);
List<String> keys = (List<String>)tuples.get(0).get("return-value");
assertEquals(keys.size(), 1);
assertEquals(keys.get(0), "key1");
cexpr = "listCache()";
paramsLoc = new ModifiableSolrParams();
paramsLoc.set("expr", cexpr);
paramsLoc.set("qt", "/stream");
solrStream = new SolrStream(url, paramsLoc);
context = new StreamContext();
solrStream.setStreamContext(context);
tuples = getTuples(solrStream);
assertTrue(tuples.size() == 1);
keys = (List<String>)tuples.get(0).get("return-value");
assertEquals(keys.size(), 1);
assertEquals(keys.get(0), "space1");
cexpr = "removeCache(space1, key1)";
paramsLoc = new ModifiableSolrParams();
paramsLoc.set("expr", cexpr);
paramsLoc.set("qt", "/stream");
solrStream = new SolrStream(url, paramsLoc);
context = new StreamContext();
solrStream.setStreamContext(context);
tuples = getTuples(solrStream);
assertTrue(tuples.size() == 1);
dotProduct = (Number)tuples.get(0).get("return-value");
assertTrue(dotProduct.doubleValue() == 182);
cexpr = "listCache(space1)";
paramsLoc = new ModifiableSolrParams();
paramsLoc.set("expr", cexpr);
paramsLoc.set("qt", "/stream");
solrStream = new SolrStream(url, paramsLoc);
context = new StreamContext();
solrStream.setStreamContext(context);
tuples = getTuples(solrStream);
assertTrue(tuples.size() == 1);
keys = (List<String>)tuples.get(0).get("return-value");
assertEquals(keys.size(), 0);
} }
@Test @Test