mirror of https://github.com/apache/lucene.git
SOLR-8972: Add GraphHandler and GraphMLResponseWriter to support graph visualizations
This commit is contained in:
parent
7d4f387384
commit
be1cb9a1cd
|
@ -2111,6 +2111,7 @@ public final class SolrCore implements SolrInfoMBean, Closeable {
|
|||
m.put("standard", m.get("xml"));
|
||||
m.put(CommonParams.JSON, new JSONResponseWriter());
|
||||
m.put("geojson", new GeoJSONResponseWriter());
|
||||
m.put("graphml", new GraphMLResponseWriter());
|
||||
m.put("python", new PythonResponseWriter());
|
||||
m.put("php", new PHPResponseWriter());
|
||||
m.put("phps", new PHPSerializedResponseWriter());
|
||||
|
|
|
@ -0,0 +1,282 @@
|
|||
package org.apache.solr.handler;
|
||||
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
import java.io.IOException;
|
||||
import java.lang.invoke.MethodHandles;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
|
||||
import org.apache.solr.client.solrj.io.SolrClientCache;
|
||||
import org.apache.solr.client.solrj.io.Tuple;
|
||||
import org.apache.solr.client.solrj.io.comp.StreamComparator;
|
||||
import org.apache.solr.client.solrj.io.graph.GatherNodesStream;
|
||||
import org.apache.solr.client.solrj.io.graph.ShortestPathStream;
|
||||
import org.apache.solr.client.solrj.io.graph.Traversal;
|
||||
import org.apache.solr.client.solrj.io.ops.ConcatOperation;
|
||||
import org.apache.solr.client.solrj.io.ops.DistinctOperation;
|
||||
import org.apache.solr.client.solrj.io.ops.GroupOperation;
|
||||
import org.apache.solr.client.solrj.io.ops.ReplaceOperation;
|
||||
import org.apache.solr.client.solrj.io.stream.*;
|
||||
import org.apache.solr.client.solrj.io.stream.expr.Explanation;
|
||||
import org.apache.solr.client.solrj.io.stream.expr.Expressible;
|
||||
import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
|
||||
import org.apache.solr.client.solrj.io.stream.metrics.CountMetric;
|
||||
import org.apache.solr.client.solrj.io.stream.metrics.MaxMetric;
|
||||
import org.apache.solr.client.solrj.io.stream.metrics.MeanMetric;
|
||||
import org.apache.solr.client.solrj.io.stream.metrics.MinMetric;
|
||||
import org.apache.solr.client.solrj.io.stream.metrics.SumMetric;
|
||||
import org.apache.solr.common.SolrException;
|
||||
import org.apache.solr.common.params.CommonParams;
|
||||
import org.apache.solr.common.params.ModifiableSolrParams;
|
||||
import org.apache.solr.common.params.SolrParams;
|
||||
import org.apache.solr.common.util.NamedList;
|
||||
import org.apache.solr.core.CloseHook;
|
||||
import org.apache.solr.core.CoreContainer;
|
||||
import org.apache.solr.core.SolrCore;
|
||||
import org.apache.solr.request.SolrQueryRequest;
|
||||
import org.apache.solr.response.SolrQueryResponse;
|
||||
import org.apache.solr.security.AuthorizationContext;
|
||||
import org.apache.solr.security.PermissionNameProvider;
|
||||
import org.apache.solr.util.plugin.SolrCoreAware;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
public class GraphHandler extends RequestHandlerBase implements SolrCoreAware, PermissionNameProvider {
|
||||
|
||||
private StreamFactory streamFactory = new StreamFactory();
|
||||
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
||||
private String coreName;
|
||||
|
||||
@Override
|
||||
public PermissionNameProvider.Name getPermissionName(AuthorizationContext request) {
|
||||
return PermissionNameProvider.Name.READ_PERM;
|
||||
}
|
||||
|
||||
public void inform(SolrCore core) {
|
||||
|
||||
/* The stream factory will always contain the zkUrl for the given collection
|
||||
* Adds default streams with their corresponding function names. These
|
||||
* defaults can be overridden or added to in the solrConfig in the stream
|
||||
* RequestHandler def. Example config override
|
||||
* <lst name="streamFunctions">
|
||||
* <str name="group">org.apache.solr.client.solrj.io.stream.ReducerStream</str>
|
||||
* <str name="count">org.apache.solr.client.solrj.io.stream.RecordCountStream</str>
|
||||
* </lst>
|
||||
* */
|
||||
|
||||
String defaultCollection = null;
|
||||
String defaultZkhost = null;
|
||||
CoreContainer coreContainer = core.getCoreDescriptor().getCoreContainer();
|
||||
this.coreName = core.getName();
|
||||
|
||||
if(coreContainer.isZooKeeperAware()) {
|
||||
defaultCollection = core.getCoreDescriptor().getCollectionName();
|
||||
defaultZkhost = core.getCoreDescriptor().getCoreContainer().getZkController().getZkServerAddress();
|
||||
streamFactory.withCollectionZkHost(defaultCollection, defaultZkhost);
|
||||
streamFactory.withDefaultZkHost(defaultZkhost);
|
||||
}
|
||||
|
||||
streamFactory
|
||||
// streams
|
||||
.withFunctionName("search", CloudSolrStream.class)
|
||||
.withFunctionName("merge", MergeStream.class)
|
||||
.withFunctionName("unique", UniqueStream.class)
|
||||
.withFunctionName("top", RankStream.class)
|
||||
.withFunctionName("group", GroupOperation.class)
|
||||
.withFunctionName("reduce", ReducerStream.class)
|
||||
.withFunctionName("parallel", ParallelStream.class)
|
||||
.withFunctionName("rollup", RollupStream.class)
|
||||
.withFunctionName("stats", StatsStream.class)
|
||||
.withFunctionName("innerJoin", InnerJoinStream.class)
|
||||
.withFunctionName("leftOuterJoin", LeftOuterJoinStream.class)
|
||||
.withFunctionName("hashJoin", HashJoinStream.class)
|
||||
.withFunctionName("outerHashJoin", OuterHashJoinStream.class)
|
||||
.withFunctionName("facet", FacetStream.class)
|
||||
.withFunctionName("update", UpdateStream.class)
|
||||
.withFunctionName("jdbc", JDBCStream.class)
|
||||
.withFunctionName("intersect", IntersectStream.class)
|
||||
.withFunctionName("complement", ComplementStream.class)
|
||||
.withFunctionName("daemon", DaemonStream.class)
|
||||
.withFunctionName("topic", TopicStream.class)
|
||||
.withFunctionName("shortestPath", ShortestPathStream.class)
|
||||
.withFunctionName("gatherNodes", GatherNodesStream.class)
|
||||
.withFunctionName("sort", SortStream.class)
|
||||
|
||||
|
||||
// metrics
|
||||
.withFunctionName("min", MinMetric.class)
|
||||
.withFunctionName("max", MaxMetric.class)
|
||||
.withFunctionName("avg", MeanMetric.class)
|
||||
.withFunctionName("sum", SumMetric.class)
|
||||
.withFunctionName("count", CountMetric.class)
|
||||
|
||||
// tuple manipulation operations
|
||||
.withFunctionName("replace", ReplaceOperation.class)
|
||||
.withFunctionName("concat", ConcatOperation.class)
|
||||
|
||||
// stream reduction operations
|
||||
.withFunctionName("group", GroupOperation.class)
|
||||
.withFunctionName("distinct", DistinctOperation.class);
|
||||
|
||||
// This pulls all the overrides and additions from the config
|
||||
Object functionMappingsObj = initArgs.get("streamFunctions");
|
||||
if(null != functionMappingsObj){
|
||||
NamedList<?> functionMappings = (NamedList<?>)functionMappingsObj;
|
||||
for(Entry<String,?> functionMapping : functionMappings){
|
||||
Class<?> clazz = core.getResourceLoader().findClass((String)functionMapping.getValue(), Expressible.class);
|
||||
streamFactory.withFunctionName(functionMapping.getKey(), clazz);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void handleRequestBody(SolrQueryRequest req, SolrQueryResponse rsp) throws Exception {
|
||||
SolrParams params = req.getParams();
|
||||
params = adjustParams(params);
|
||||
req.setParams(params);
|
||||
|
||||
|
||||
TupleStream tupleStream = null;
|
||||
|
||||
try {
|
||||
tupleStream = this.streamFactory.constructStream(params.get("expr"));
|
||||
} catch (Exception e) {
|
||||
//Catch exceptions that occur while the stream is being created. This will include streaming expression parse rules.
|
||||
SolrException.log(logger, e);
|
||||
Map requestContext = req.getContext();
|
||||
requestContext.put("stream", new DummyErrorStream(e));
|
||||
return;
|
||||
}
|
||||
|
||||
StreamContext context = new StreamContext();
|
||||
context.setSolrClientCache(StreamHandler.clientCache);
|
||||
context.put("core", this.coreName);
|
||||
Traversal traversal = new Traversal();
|
||||
context.put("traversal", traversal);
|
||||
tupleStream.setStreamContext(context);
|
||||
Map requestContext = req.getContext();
|
||||
requestContext.put("stream", new TimerStream(new ExceptionStream(tupleStream)));
|
||||
requestContext.put("traversal", traversal);
|
||||
}
|
||||
|
||||
public String getDescription() {
|
||||
return "StreamHandler";
|
||||
}
|
||||
|
||||
public String getSource() {
|
||||
return null;
|
||||
}
|
||||
|
||||
|
||||
public static class DummyErrorStream extends TupleStream {
|
||||
private Exception e;
|
||||
|
||||
public DummyErrorStream(Exception e) {
|
||||
this.e = e;
|
||||
}
|
||||
public StreamComparator getStreamSort() {
|
||||
return null;
|
||||
}
|
||||
|
||||
public void close() {
|
||||
}
|
||||
|
||||
public void open() {
|
||||
}
|
||||
|
||||
public Exception getException() {
|
||||
return this.e;
|
||||
}
|
||||
|
||||
public void setStreamContext(StreamContext context) {
|
||||
}
|
||||
|
||||
public List<TupleStream> children() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Explanation toExplanation(StreamFactory factory) throws IOException {
|
||||
return null;
|
||||
}
|
||||
|
||||
public Tuple read() {
|
||||
String msg = e.getMessage();
|
||||
Map m = new HashMap();
|
||||
m.put("EOF", true);
|
||||
m.put("EXCEPTION", msg);
|
||||
return new Tuple(m);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private SolrParams adjustParams(SolrParams params) {
|
||||
ModifiableSolrParams adjustedParams = new ModifiableSolrParams();
|
||||
adjustedParams.add(params);
|
||||
adjustedParams.add(CommonParams.OMIT_HEADER, "true");
|
||||
return adjustedParams;
|
||||
}
|
||||
|
||||
public static class TimerStream extends TupleStream {
|
||||
|
||||
private long begin;
|
||||
private TupleStream tupleStream;
|
||||
|
||||
public TimerStream(TupleStream tupleStream) {
|
||||
this.tupleStream = tupleStream;
|
||||
}
|
||||
|
||||
public StreamComparator getStreamSort() {
|
||||
return this.tupleStream.getStreamSort();
|
||||
}
|
||||
|
||||
public void close() throws IOException {
|
||||
this.tupleStream.close();
|
||||
}
|
||||
|
||||
public void open() throws IOException {
|
||||
this.begin = System.nanoTime();
|
||||
this.tupleStream.open();
|
||||
}
|
||||
|
||||
public void setStreamContext(StreamContext context) {
|
||||
this.tupleStream.setStreamContext(context);
|
||||
}
|
||||
|
||||
public List<TupleStream> children() {
|
||||
return this.tupleStream.children();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Explanation toExplanation(StreamFactory factory) throws IOException {
|
||||
return null;
|
||||
}
|
||||
|
||||
public Tuple read() throws IOException {
|
||||
Tuple tuple = this.tupleStream.read();
|
||||
if(tuple.EOF) {
|
||||
long totalTime = (System.nanoTime() - begin) / 1000000;
|
||||
tuple.fields.put("RESPONSE_TIME", totalTime);
|
||||
}
|
||||
return tuple;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,167 @@
|
|||
package org.apache.solr.response;
|
||||
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.PrintWriter;
|
||||
import java.io.Writer;
|
||||
import java.lang.invoke.MethodHandles;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.ArrayList;
|
||||
|
||||
import org.apache.solr.client.solrj.io.graph.Traversal;
|
||||
import org.apache.solr.client.solrj.io.stream.TupleStream;
|
||||
import org.apache.solr.client.solrj.io.Tuple;
|
||||
import org.apache.solr.common.util.NamedList;
|
||||
import org.apache.solr.handler.GraphHandler;
|
||||
import org.apache.solr.request.SolrQueryRequest;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
||||
public class GraphMLResponseWriter implements QueryResponseWriter {
|
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
||||
|
||||
public void init(NamedList args) {
|
||||
/* NOOP */
|
||||
}
|
||||
|
||||
public String getContentType(SolrQueryRequest req, SolrQueryResponse res) {
|
||||
return "application/xml";
|
||||
}
|
||||
|
||||
public void write(Writer writer, SolrQueryRequest req, SolrQueryResponse res) throws IOException {
|
||||
|
||||
Exception e1 = res.getException();
|
||||
if(e1 != null) {
|
||||
e1.printStackTrace(new PrintWriter(writer));
|
||||
return;
|
||||
}
|
||||
|
||||
TupleStream stream = (TupleStream)req.getContext().get("stream");
|
||||
|
||||
if(stream instanceof GraphHandler.DummyErrorStream) {
|
||||
GraphHandler.DummyErrorStream d = (GraphHandler.DummyErrorStream)stream;
|
||||
Exception e = d.getException();
|
||||
e.printStackTrace(new PrintWriter(writer));
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
Traversal traversal = (Traversal)req.getContext().get("traversal");
|
||||
PrintWriter printWriter = new PrintWriter(writer);
|
||||
|
||||
try {
|
||||
|
||||
stream.open();
|
||||
|
||||
Tuple tuple = null;
|
||||
|
||||
int edgeCount = 0;
|
||||
|
||||
printWriter.println("<?xml version=\"1.0\" encoding=\"UTF-8\"?>");
|
||||
printWriter.println("<graphml xmlns=\"http://graphml.graphdrawing.org/xmlns\" ");
|
||||
printWriter.println("xmlns:xsi=\"http://www.w3.org/2001/XMLSchema-instance\" ");
|
||||
printWriter.print("xsi:schemaLocation=\"http://graphml.graphdrawing.org/xmlns ");
|
||||
printWriter.println("http://graphml.graphdrawing.org/xmlns/1.0/graphml.xsd\">");
|
||||
|
||||
printWriter.println("<graph id=\"G\" edgedefault=\"directed\">");
|
||||
|
||||
while (true) {
|
||||
//Output the graph
|
||||
tuple = stream.read();
|
||||
if (tuple.EOF) {
|
||||
break;
|
||||
}
|
||||
|
||||
String id = tuple.getString("node");
|
||||
|
||||
if (traversal.isMultiCollection()) {
|
||||
id = tuple.getString("collection") + "." + id;
|
||||
}
|
||||
|
||||
writer.write("<node id=\""+replace(id)+"\"");
|
||||
|
||||
List<String> outfields = new ArrayList();
|
||||
Iterator<String> keys = tuple.fields.keySet().iterator();
|
||||
while(keys.hasNext()) {
|
||||
String key = keys.next();
|
||||
if(key.equals("node") || key.equals("ancestors") || key.equals("collection")) {
|
||||
continue;
|
||||
} else {
|
||||
outfields.add(key);
|
||||
}
|
||||
}
|
||||
|
||||
if (outfields.size() > 0) {
|
||||
printWriter.println(">");
|
||||
for (String nodeAttribute : outfields) {
|
||||
Object o = tuple.get(nodeAttribute);
|
||||
if (o != null) {
|
||||
printWriter.println("<data key=\""+nodeAttribute+"\">" + o.toString() + "</data>");
|
||||
}
|
||||
}
|
||||
printWriter.println("</node>");
|
||||
} else {
|
||||
printWriter.println("/>");
|
||||
}
|
||||
|
||||
List<String> ancestors = tuple.getStrings("ancestors");
|
||||
|
||||
if(ancestors != null) {
|
||||
for (String ancestor : ancestors) {
|
||||
++edgeCount;
|
||||
writer.write("<edge id=\"" + edgeCount + "\" ");
|
||||
writer.write(" source=\"" + replace(ancestor) + "\" ");
|
||||
printWriter.println(" target=\"" + replace(id) + "\"/>");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
writer.write("</graph></graphml>");
|
||||
} finally {
|
||||
stream.close();
|
||||
}
|
||||
}
|
||||
|
||||
private String replace(String s) {
|
||||
if(s.indexOf(">") > -1) {
|
||||
s = s.replace(">", ">");
|
||||
}
|
||||
|
||||
if(s.indexOf("<") > -1) {
|
||||
s = s.replace("<", "<");
|
||||
}
|
||||
|
||||
if(s.indexOf("\"")> -1) {
|
||||
s = s.replace("\"", """);
|
||||
}
|
||||
|
||||
if(s.indexOf("'") > -1) {
|
||||
s = s.replace("'", "'");
|
||||
}
|
||||
|
||||
if(s.indexOf("&") > -1) {
|
||||
s = s.replace("&", "&");
|
||||
}
|
||||
|
||||
return s;
|
||||
}
|
||||
}
|
|
@ -84,6 +84,13 @@
|
|||
"distrib": false
|
||||
}
|
||||
},
|
||||
"/graph": {
|
||||
"class": "solr.GraphHandler",
|
||||
"invariants": {
|
||||
"wt": "graphml",
|
||||
"distrib": false
|
||||
}
|
||||
},
|
||||
"/stream": {
|
||||
"class": "solr.StreamHandler",
|
||||
"invariants": {
|
||||
|
|
|
@ -0,0 +1,155 @@
|
|||
package org.apache.solr.response;
|
||||
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
import java.io.StringWriter;
|
||||
import java.util.Map;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Iterator;
|
||||
|
||||
import org.apache.solr.SolrTestCaseJ4;
|
||||
import org.apache.solr.client.solrj.io.comp.StreamComparator;
|
||||
import org.apache.solr.client.solrj.io.graph.Traversal;
|
||||
import org.apache.solr.client.solrj.io.stream.TupleStream;
|
||||
import org.apache.solr.client.solrj.io.stream.StreamContext;
|
||||
import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
|
||||
import org.apache.solr.client.solrj.io.Tuple;
|
||||
import org.apache.solr.client.solrj.io.stream.expr.Explanation;
|
||||
import org.apache.solr.request.SolrQueryRequest;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
|
||||
public class TestGraphMLResponseWriter extends SolrTestCaseJ4 {
|
||||
@BeforeClass
|
||||
public static void beforeClass() throws Exception {
|
||||
System.setProperty("enable.update.log", "false"); // schema12 doesn't support _version_
|
||||
initCore("solrconfig.xml","schema12.xml");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGraphMLOutput() throws Exception {
|
||||
SolrQueryRequest request = req("blah", "blah"); // Just need a request to attach the stream and traversal to.
|
||||
SolrQueryResponse response = new SolrQueryResponse();
|
||||
Map context = request.getContext();
|
||||
TupleStream stream = new TestStream(); //Simulates a GatherNodesStream
|
||||
Traversal traversal = new Traversal();
|
||||
context.put("traversal", traversal);
|
||||
context.put("stream", stream);
|
||||
StringWriter writer = new StringWriter();
|
||||
|
||||
GraphMLResponseWriter graphMLResponseWriter = new GraphMLResponseWriter();
|
||||
graphMLResponseWriter.write(writer, request, response);
|
||||
String graphML = writer.toString();
|
||||
|
||||
//Validate the nodes
|
||||
String error = h.validateXPath(graphML,
|
||||
"//graph/node[1][@id ='bill']",
|
||||
"//graph/node[2][@id ='jim']",
|
||||
"//graph/node[3][@id ='max']");
|
||||
if(error != null) {
|
||||
throw new Exception(error);
|
||||
}
|
||||
//Validate the edges
|
||||
error = h.validateXPath(graphML,
|
||||
"//graph/edge[1][@source ='jim']",
|
||||
"//graph/edge[1][@target ='bill']",
|
||||
"//graph/edge[2][@source ='max']",
|
||||
"//graph/edge[2][@target ='bill']",
|
||||
"//graph/edge[3][@source ='max']",
|
||||
"//graph/edge[3][@target ='jim']",
|
||||
"//graph/edge[4][@source ='jim']",
|
||||
"//graph/edge[4][@target ='max']"
|
||||
);
|
||||
|
||||
if(error != null) {
|
||||
throw new Exception(error);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private class TestStream extends TupleStream {
|
||||
|
||||
private Iterator<Tuple> tuples;
|
||||
|
||||
public TestStream() {
|
||||
//Create some nodes
|
||||
List<Tuple> testTuples = new ArrayList();
|
||||
Map m1 = new HashMap();
|
||||
|
||||
List<String> an1 = new ArrayList();
|
||||
an1.add("jim");
|
||||
an1.add("max");
|
||||
m1.put("node", "bill");
|
||||
m1.put("ancestors", an1);
|
||||
testTuples.add(new Tuple(m1));
|
||||
|
||||
Map m2 = new HashMap();
|
||||
List<String> an2 = new ArrayList();
|
||||
an2.add("max");
|
||||
m2.put("node", "jim");
|
||||
m2.put("ancestors", an2);
|
||||
testTuples.add(new Tuple(m2));
|
||||
|
||||
Map m3 = new HashMap();
|
||||
List<String> an3 = new ArrayList();
|
||||
an3.add("jim");
|
||||
m3.put("node", "max");
|
||||
m3.put("ancestors", an3);
|
||||
testTuples.add(new Tuple(m3));
|
||||
|
||||
tuples = testTuples.iterator();
|
||||
}
|
||||
|
||||
public StreamComparator getStreamSort() {
|
||||
return null;
|
||||
}
|
||||
|
||||
public void close() {
|
||||
|
||||
}
|
||||
|
||||
public void open() {
|
||||
|
||||
}
|
||||
|
||||
public List<TupleStream> children() {
|
||||
return null;
|
||||
}
|
||||
|
||||
public Tuple read() {
|
||||
if(tuples.hasNext()) {
|
||||
return tuples.next();
|
||||
} else {
|
||||
Map map = new HashMap();
|
||||
map.put("EOF", true);
|
||||
return new Tuple(map);
|
||||
}
|
||||
}
|
||||
|
||||
public void setStreamContext(StreamContext streamContext) {
|
||||
|
||||
}
|
||||
|
||||
public Explanation toExplanation(StreamFactory factory) {
|
||||
return null;
|
||||
}
|
||||
|
||||
}
|
||||
}
|
|
@ -18,6 +18,8 @@ package org.apache.solr.client.solrj.io.graph;
|
|||
*/
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStreamReader;
|
||||
import java.io.InputStream;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
|
@ -28,6 +30,10 @@ import java.util.Set;
|
|||
|
||||
import org.apache.lucene.util.LuceneTestCase;
|
||||
import org.apache.lucene.util.LuceneTestCase.Slow;
|
||||
import org.apache.solr.client.solrj.SolrRequest;
|
||||
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
|
||||
import org.apache.solr.client.solrj.impl.HttpSolrClient;
|
||||
import org.apache.solr.client.solrj.impl.InputStreamResponseParser;
|
||||
import org.apache.solr.client.solrj.io.SolrClientCache;
|
||||
import org.apache.solr.client.solrj.io.Tuple;
|
||||
import org.apache.solr.client.solrj.io.comp.ComparatorOrder;
|
||||
|
@ -43,9 +49,12 @@ import org.apache.solr.client.solrj.io.stream.metrics.MeanMetric;
|
|||
import org.apache.solr.client.solrj.io.stream.metrics.MinMetric;
|
||||
import org.apache.solr.client.solrj.io.stream.metrics.SumMetric;
|
||||
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
|
||||
import org.apache.solr.client.solrj.request.QueryRequest;
|
||||
import org.apache.solr.client.solrj.request.UpdateRequest;
|
||||
import org.apache.solr.cloud.AbstractDistribZkTestBase;
|
||||
import org.apache.solr.cloud.SolrCloudTestCase;
|
||||
import org.apache.solr.common.params.ModifiableSolrParams;
|
||||
import org.apache.solr.common.util.NamedList;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
|
@ -266,8 +275,8 @@ public class GraphExpressionTest extends SolrCloudTestCase {
|
|||
.withFunctionName("max", MaxMetric.class);
|
||||
|
||||
String expr = "gatherNodes(collection1, " +
|
||||
"walk=\"product1->product_s\"," +
|
||||
"gather=\"basket_s\")";
|
||||
"walk=\"product1->product_s\"," +
|
||||
"gather=\"basket_s\")";
|
||||
|
||||
stream = (GatherNodesStream)factory.constructStream(expr);
|
||||
stream.setStreamContext(context);
|
||||
|
@ -284,9 +293,9 @@ public class GraphExpressionTest extends SolrCloudTestCase {
|
|||
|
||||
//Test maxDocFreq param
|
||||
String docFreqExpr = "gatherNodes(collection1, " +
|
||||
"walk=\"product1, product7->product_s\"," +
|
||||
"maxDocFreq=\"2\","+
|
||||
"gather=\"basket_s\")";
|
||||
"walk=\"product1, product7->product_s\"," +
|
||||
"maxDocFreq=\"2\","+
|
||||
"gather=\"basket_s\")";
|
||||
|
||||
stream = (GatherNodesStream)factory.constructStream(docFreqExpr);
|
||||
stream.setStreamContext(context);
|
||||
|
@ -299,9 +308,9 @@ public class GraphExpressionTest extends SolrCloudTestCase {
|
|||
|
||||
|
||||
String expr2 = "gatherNodes(collection1, " +
|
||||
expr+","+
|
||||
"walk=\"node->basket_s\"," +
|
||||
"gather=\"product_s\", count(*), avg(price_f), sum(price_f), min(price_f), max(price_f))";
|
||||
expr+","+
|
||||
"walk=\"node->basket_s\"," +
|
||||
"gather=\"product_s\", count(*), avg(price_f), sum(price_f), min(price_f), max(price_f))";
|
||||
|
||||
stream = (GatherNodesStream)factory.constructStream(expr2);
|
||||
|
||||
|
@ -338,8 +347,8 @@ public class GraphExpressionTest extends SolrCloudTestCase {
|
|||
|
||||
//Test list of root nodes
|
||||
expr = "gatherNodes(collection1, " +
|
||||
"walk=\"product4, product7->product_s\"," +
|
||||
"gather=\"basket_s\")";
|
||||
"walk=\"product4, product7->product_s\"," +
|
||||
"gather=\"basket_s\")";
|
||||
|
||||
stream = (GatherNodesStream)factory.constructStream(expr);
|
||||
|
||||
|
@ -356,8 +365,8 @@ public class GraphExpressionTest extends SolrCloudTestCase {
|
|||
//Test with negative filter query
|
||||
|
||||
expr = "gatherNodes(collection1, " +
|
||||
"walk=\"product4, product7->product_s\"," +
|
||||
"gather=\"basket_s\", fq=\"-basket_s:basket4\")";
|
||||
"walk=\"product4, product7->product_s\"," +
|
||||
"gather=\"basket_s\", fq=\"-basket_s:basket4\")";
|
||||
|
||||
stream = (GatherNodesStream)factory.constructStream(expr);
|
||||
|
||||
|
@ -406,8 +415,8 @@ public class GraphExpressionTest extends SolrCloudTestCase {
|
|||
.withFunctionName("max", MaxMetric.class);
|
||||
|
||||
String expr = "gatherNodes(collection1, " +
|
||||
"walk=\"bill->from_s\"," +
|
||||
"gather=\"to_s\")";
|
||||
"walk=\"bill->from_s\"," +
|
||||
"gather=\"to_s\")";
|
||||
|
||||
stream = (GatherNodesStream)factory.constructStream(expr);
|
||||
stream.setStreamContext(context);
|
||||
|
@ -423,9 +432,9 @@ public class GraphExpressionTest extends SolrCloudTestCase {
|
|||
//Test scatter branches, leaves and trackTraversal
|
||||
|
||||
expr = "gatherNodes(collection1, " +
|
||||
"walk=\"bill->from_s\"," +
|
||||
"gather=\"to_s\","+
|
||||
"scatter=\"branches, leaves\", trackTraversal=\"true\")";
|
||||
"walk=\"bill->from_s\"," +
|
||||
"gather=\"to_s\","+
|
||||
"scatter=\"branches, leaves\", trackTraversal=\"true\")";
|
||||
|
||||
stream = (GatherNodesStream)factory.constructStream(expr);
|
||||
context = new StreamContext();
|
||||
|
@ -461,9 +470,9 @@ public class GraphExpressionTest extends SolrCloudTestCase {
|
|||
// Test query root
|
||||
|
||||
expr = "gatherNodes(collection1, " +
|
||||
"search(collection1, q=\"message_t:jim\", fl=\"from_s\", sort=\"from_s asc\"),"+
|
||||
"walk=\"from_s->from_s\"," +
|
||||
"gather=\"to_s\")";
|
||||
"search(collection1, q=\"message_t:jim\", fl=\"from_s\", sort=\"from_s asc\"),"+
|
||||
"walk=\"from_s->from_s\"," +
|
||||
"gather=\"to_s\")";
|
||||
|
||||
stream = (GatherNodesStream)factory.constructStream(expr);
|
||||
context = new StreamContext();
|
||||
|
@ -482,9 +491,9 @@ public class GraphExpressionTest extends SolrCloudTestCase {
|
|||
// Test query root scatter branches
|
||||
|
||||
expr = "gatherNodes(collection1, " +
|
||||
"search(collection1, q=\"message_t:jim\", fl=\"from_s\", sort=\"from_s asc\"),"+
|
||||
"walk=\"from_s->from_s\"," +
|
||||
"gather=\"to_s\", scatter=\"branches, leaves\")";
|
||||
"search(collection1, q=\"message_t:jim\", fl=\"from_s\", sort=\"from_s asc\"),"+
|
||||
"walk=\"from_s->from_s\"," +
|
||||
"gather=\"to_s\", scatter=\"branches, leaves\")";
|
||||
|
||||
stream = (GatherNodesStream)factory.constructStream(expr);
|
||||
context = new StreamContext();
|
||||
|
@ -505,14 +514,14 @@ public class GraphExpressionTest extends SolrCloudTestCase {
|
|||
assertTrue(tuples.get(3).getLong("level").equals(new Long(1)));
|
||||
|
||||
expr = "gatherNodes(collection1, " +
|
||||
"search(collection1, q=\"message_t:jim\", fl=\"from_s\", sort=\"from_s asc\"),"+
|
||||
"walk=\"from_s->from_s\"," +
|
||||
"gather=\"to_s\")";
|
||||
"search(collection1, q=\"message_t:jim\", fl=\"from_s\", sort=\"from_s asc\"),"+
|
||||
"walk=\"from_s->from_s\"," +
|
||||
"gather=\"to_s\")";
|
||||
|
||||
String expr2 = "gatherNodes(collection1, " +
|
||||
expr+","+
|
||||
"walk=\"node->from_s\"," +
|
||||
"gather=\"to_s\")";
|
||||
expr+","+
|
||||
"walk=\"node->from_s\"," +
|
||||
"gather=\"to_s\")";
|
||||
|
||||
stream = (GatherNodesStream)factory.constructStream(expr2);
|
||||
context = new StreamContext();
|
||||
|
@ -548,14 +557,14 @@ public class GraphExpressionTest extends SolrCloudTestCase {
|
|||
|
||||
|
||||
expr = "gatherNodes(collection1, " +
|
||||
"search(collection1, q=\"message_t:jim\", fl=\"from_s\", sort=\"from_s asc\"),"+
|
||||
"walk=\"from_s->from_s\"," +
|
||||
"gather=\"to_s\")";
|
||||
"search(collection1, q=\"message_t:jim\", fl=\"from_s\", sort=\"from_s asc\"),"+
|
||||
"walk=\"from_s->from_s\"," +
|
||||
"gather=\"to_s\")";
|
||||
|
||||
expr2 = "gatherNodes(collection1, " +
|
||||
expr+","+
|
||||
"walk=\"node->from_s\"," +
|
||||
"gather=\"to_s\", scatter=\"branches, leaves\")";
|
||||
expr+","+
|
||||
"walk=\"node->from_s\"," +
|
||||
"gather=\"to_s\", scatter=\"branches, leaves\")";
|
||||
|
||||
stream = (GatherNodesStream)factory.constructStream(expr2);
|
||||
context = new StreamContext();
|
||||
|
@ -589,14 +598,14 @@ public class GraphExpressionTest extends SolrCloudTestCase {
|
|||
.commit(cluster.getSolrClient(), COLLECTION);
|
||||
|
||||
expr = "gatherNodes(collection1, " +
|
||||
"search(collection1, q=\"message_t:jim\", fl=\"from_s\", sort=\"from_s asc\"),"+
|
||||
"walk=\"from_s->from_s\"," +
|
||||
"gather=\"to_s\", trackTraversal=\"true\")";
|
||||
"search(collection1, q=\"message_t:jim\", fl=\"from_s\", sort=\"from_s asc\"),"+
|
||||
"walk=\"from_s->from_s\"," +
|
||||
"gather=\"to_s\", trackTraversal=\"true\")";
|
||||
|
||||
expr2 = "gatherNodes(collection1, " +
|
||||
expr+","+
|
||||
"walk=\"node->from_s\"," +
|
||||
"gather=\"to_s\", scatter=\"branches, leaves\", trackTraversal=\"true\")";
|
||||
expr+","+
|
||||
"walk=\"node->from_s\"," +
|
||||
"gather=\"to_s\", scatter=\"branches, leaves\", trackTraversal=\"true\")";
|
||||
|
||||
stream = (GatherNodesStream)factory.constructStream(expr2);
|
||||
context = new StreamContext();
|
||||
|
@ -634,6 +643,84 @@ public class GraphExpressionTest extends SolrCloudTestCase {
|
|||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGraphHandler() throws Exception {
|
||||
|
||||
|
||||
new UpdateRequest()
|
||||
.add(id, "0", "from_s", "bill", "to_s", "jim", "message_t", "Hello jim")
|
||||
.add(id, "1", "from_s", "bill", "to_s", "sam", "message_t", "Hello sam")
|
||||
.add(id, "2", "from_s", "bill", "to_s", "max", "message_t", "Hello max")
|
||||
.add(id, "3", "from_s", "max", "to_s", "kip", "message_t", "Hello kip")
|
||||
.add(id, "4", "from_s", "sam", "to_s", "steve", "message_t", "Hello steve")
|
||||
.add(id, "5", "from_s", "jim", "to_s", "ann", "message_t", "Hello steve")
|
||||
.commit(cluster.getSolrClient(), COLLECTION);
|
||||
|
||||
commit();
|
||||
|
||||
List<JettySolrRunner> runners = cluster.getJettySolrRunners();
|
||||
JettySolrRunner runner = runners.get(0);
|
||||
String url = runner.getBaseUrl().toString();
|
||||
|
||||
HttpSolrClient client = new HttpSolrClient(url);
|
||||
ModifiableSolrParams params = new ModifiableSolrParams();
|
||||
|
||||
|
||||
String expr = "sort(by=\"node asc\", gatherNodes(collection1, " +
|
||||
"walk=\"bill->from_s\"," +
|
||||
"trackTraversal=\"true\"," +
|
||||
"gather=\"to_s\"))";
|
||||
|
||||
params.add("expr", expr);
|
||||
QueryRequest query = new QueryRequest(params);
|
||||
query.setPath("/collection1/graph");
|
||||
|
||||
query.setResponseParser(new InputStreamResponseParser("xml"));
|
||||
query.setMethod(SolrRequest.METHOD.POST);
|
||||
|
||||
NamedList<Object> genericResponse = client.request(query);
|
||||
|
||||
|
||||
InputStream stream = (InputStream)genericResponse.get("stream");
|
||||
InputStreamReader reader = new InputStreamReader(stream, "UTF-8");
|
||||
String xml = readString(reader);
|
||||
//Validate the nodes
|
||||
String error = h.validateXPath(xml,
|
||||
"//graph/node[1][@id ='jim']",
|
||||
"//graph/node[2][@id ='max']",
|
||||
"//graph/node[3][@id ='sam']");
|
||||
if(error != null) {
|
||||
throw new Exception(error);
|
||||
}
|
||||
//Validate the edges
|
||||
error = h.validateXPath(xml,
|
||||
"//graph/edge[1][@source ='bill']",
|
||||
"//graph/edge[1][@target ='jim']",
|
||||
"//graph/edge[2][@source ='bill']",
|
||||
"//graph/edge[2][@target ='max']",
|
||||
"//graph/edge[3][@source ='bill']",
|
||||
"//graph/edge[3][@target ='sam']");
|
||||
|
||||
if(error != null) {
|
||||
throw new Exception(error);
|
||||
}
|
||||
|
||||
client.close();
|
||||
}
|
||||
|
||||
private String readString(InputStreamReader reader) throws Exception{
|
||||
StringBuilder builder = new StringBuilder();
|
||||
int c = 0;
|
||||
while((c = reader.read()) != -1) {
|
||||
builder.append(((char)c));
|
||||
}
|
||||
|
||||
return builder.toString();
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
protected List<Tuple> getTuples(TupleStream tupleStream) throws IOException {
|
||||
tupleStream.open();
|
||||
List<Tuple> tuples = new ArrayList();
|
||||
|
@ -675,7 +762,9 @@ public class GraphExpressionTest extends SolrCloudTestCase {
|
|||
throw new Exception("Longs not equal:"+expected+" : "+actual);
|
||||
}
|
||||
|
||||
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
}
|
||||
}
|
|
@ -175,7 +175,6 @@ public class StreamExpressionTest extends SolrCloudTestCase {
|
|||
assert(tuples.size() == 3);
|
||||
assertOrder(tuples, 0, 3, 4);
|
||||
assertLong(tuples.get(1), "a_i", 3);
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
Loading…
Reference in New Issue