From be1cb9a1cde4dd426305f22620734d018f21dd82 Mon Sep 17 00:00:00 2001 From: jbernste Date: Thu, 5 May 2016 14:27:05 -0400 Subject: [PATCH] SOLR-8972: Add GraphHandler and GraphMLResponseWriter to support graph visualizations --- .../java/org/apache/solr/core/SolrCore.java | 1 + .../org/apache/solr/handler/GraphHandler.java | 282 ++++++++++++++++++ .../solr/response/GraphMLResponseWriter.java | 167 +++++++++++ solr/core/src/resources/ImplicitPlugins.json | 7 + .../response/TestGraphMLResponseWriter.java | 155 ++++++++++ .../solrj/io/graph/GraphExpressionTest.java | 173 ++++++++--- .../solrj/io/stream/StreamExpressionTest.java | 1 - 7 files changed, 743 insertions(+), 43 deletions(-) create mode 100644 solr/core/src/java/org/apache/solr/handler/GraphHandler.java create mode 100644 solr/core/src/java/org/apache/solr/response/GraphMLResponseWriter.java create mode 100644 solr/core/src/test/org/apache/solr/response/TestGraphMLResponseWriter.java diff --git a/solr/core/src/java/org/apache/solr/core/SolrCore.java b/solr/core/src/java/org/apache/solr/core/SolrCore.java index b94b3d8e17a..d5cde1669a9 100644 --- a/solr/core/src/java/org/apache/solr/core/SolrCore.java +++ b/solr/core/src/java/org/apache/solr/core/SolrCore.java @@ -2111,6 +2111,7 @@ public final class SolrCore implements SolrInfoMBean, Closeable { m.put("standard", m.get("xml")); m.put(CommonParams.JSON, new JSONResponseWriter()); m.put("geojson", new GeoJSONResponseWriter()); + m.put("graphml", new GraphMLResponseWriter()); m.put("python", new PythonResponseWriter()); m.put("php", new PHPResponseWriter()); m.put("phps", new PHPSerializedResponseWriter()); diff --git a/solr/core/src/java/org/apache/solr/handler/GraphHandler.java b/solr/core/src/java/org/apache/solr/handler/GraphHandler.java new file mode 100644 index 00000000000..a6e2ce149bf --- /dev/null +++ b/solr/core/src/java/org/apache/solr/handler/GraphHandler.java @@ -0,0 +1,282 @@ +package org.apache.solr.handler; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.io.IOException; +import java.lang.invoke.MethodHandles; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; + +import org.apache.solr.client.solrj.io.SolrClientCache; +import org.apache.solr.client.solrj.io.Tuple; +import org.apache.solr.client.solrj.io.comp.StreamComparator; +import org.apache.solr.client.solrj.io.graph.GatherNodesStream; +import org.apache.solr.client.solrj.io.graph.ShortestPathStream; +import org.apache.solr.client.solrj.io.graph.Traversal; +import org.apache.solr.client.solrj.io.ops.ConcatOperation; +import org.apache.solr.client.solrj.io.ops.DistinctOperation; +import org.apache.solr.client.solrj.io.ops.GroupOperation; +import org.apache.solr.client.solrj.io.ops.ReplaceOperation; +import org.apache.solr.client.solrj.io.stream.*; +import org.apache.solr.client.solrj.io.stream.expr.Explanation; +import org.apache.solr.client.solrj.io.stream.expr.Expressible; +import org.apache.solr.client.solrj.io.stream.expr.StreamFactory; +import org.apache.solr.client.solrj.io.stream.metrics.CountMetric; +import org.apache.solr.client.solrj.io.stream.metrics.MaxMetric; +import org.apache.solr.client.solrj.io.stream.metrics.MeanMetric; +import org.apache.solr.client.solrj.io.stream.metrics.MinMetric; +import org.apache.solr.client.solrj.io.stream.metrics.SumMetric; +import org.apache.solr.common.SolrException; +import org.apache.solr.common.params.CommonParams; +import org.apache.solr.common.params.ModifiableSolrParams; +import org.apache.solr.common.params.SolrParams; +import org.apache.solr.common.util.NamedList; +import org.apache.solr.core.CloseHook; +import org.apache.solr.core.CoreContainer; +import org.apache.solr.core.SolrCore; +import org.apache.solr.request.SolrQueryRequest; +import org.apache.solr.response.SolrQueryResponse; +import org.apache.solr.security.AuthorizationContext; +import org.apache.solr.security.PermissionNameProvider; +import org.apache.solr.util.plugin.SolrCoreAware; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class GraphHandler extends RequestHandlerBase implements SolrCoreAware, PermissionNameProvider { + + private StreamFactory streamFactory = new StreamFactory(); + private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + private String coreName; + + @Override + public PermissionNameProvider.Name getPermissionName(AuthorizationContext request) { + return PermissionNameProvider.Name.READ_PERM; + } + + public void inform(SolrCore core) { + + /* The stream factory will always contain the zkUrl for the given collection + * Adds default streams with their corresponding function names. These + * defaults can be overridden or added to in the solrConfig in the stream + * RequestHandler def. Example config override + * + * org.apache.solr.client.solrj.io.stream.ReducerStream + * org.apache.solr.client.solrj.io.stream.RecordCountStream + * + * */ + + String defaultCollection = null; + String defaultZkhost = null; + CoreContainer coreContainer = core.getCoreDescriptor().getCoreContainer(); + this.coreName = core.getName(); + + if(coreContainer.isZooKeeperAware()) { + defaultCollection = core.getCoreDescriptor().getCollectionName(); + defaultZkhost = core.getCoreDescriptor().getCoreContainer().getZkController().getZkServerAddress(); + streamFactory.withCollectionZkHost(defaultCollection, defaultZkhost); + streamFactory.withDefaultZkHost(defaultZkhost); + } + + streamFactory + // streams + .withFunctionName("search", CloudSolrStream.class) + .withFunctionName("merge", MergeStream.class) + .withFunctionName("unique", UniqueStream.class) + .withFunctionName("top", RankStream.class) + .withFunctionName("group", GroupOperation.class) + .withFunctionName("reduce", ReducerStream.class) + .withFunctionName("parallel", ParallelStream.class) + .withFunctionName("rollup", RollupStream.class) + .withFunctionName("stats", StatsStream.class) + .withFunctionName("innerJoin", InnerJoinStream.class) + .withFunctionName("leftOuterJoin", LeftOuterJoinStream.class) + .withFunctionName("hashJoin", HashJoinStream.class) + .withFunctionName("outerHashJoin", OuterHashJoinStream.class) + .withFunctionName("facet", FacetStream.class) + .withFunctionName("update", UpdateStream.class) + .withFunctionName("jdbc", JDBCStream.class) + .withFunctionName("intersect", IntersectStream.class) + .withFunctionName("complement", ComplementStream.class) + .withFunctionName("daemon", DaemonStream.class) + .withFunctionName("topic", TopicStream.class) + .withFunctionName("shortestPath", ShortestPathStream.class) + .withFunctionName("gatherNodes", GatherNodesStream.class) + .withFunctionName("sort", SortStream.class) + + + // metrics + .withFunctionName("min", MinMetric.class) + .withFunctionName("max", MaxMetric.class) + .withFunctionName("avg", MeanMetric.class) + .withFunctionName("sum", SumMetric.class) + .withFunctionName("count", CountMetric.class) + + // tuple manipulation operations + .withFunctionName("replace", ReplaceOperation.class) + .withFunctionName("concat", ConcatOperation.class) + + // stream reduction operations + .withFunctionName("group", GroupOperation.class) + .withFunctionName("distinct", DistinctOperation.class); + + // This pulls all the overrides and additions from the config + Object functionMappingsObj = initArgs.get("streamFunctions"); + if(null != functionMappingsObj){ + NamedList functionMappings = (NamedList)functionMappingsObj; + for(Entry functionMapping : functionMappings){ + Class clazz = core.getResourceLoader().findClass((String)functionMapping.getValue(), Expressible.class); + streamFactory.withFunctionName(functionMapping.getKey(), clazz); + } + } + } + + public void handleRequestBody(SolrQueryRequest req, SolrQueryResponse rsp) throws Exception { + SolrParams params = req.getParams(); + params = adjustParams(params); + req.setParams(params); + + + TupleStream tupleStream = null; + + try { + tupleStream = this.streamFactory.constructStream(params.get("expr")); + } catch (Exception e) { + //Catch exceptions that occur while the stream is being created. This will include streaming expression parse rules. + SolrException.log(logger, e); + Map requestContext = req.getContext(); + requestContext.put("stream", new DummyErrorStream(e)); + return; + } + + StreamContext context = new StreamContext(); + context.setSolrClientCache(StreamHandler.clientCache); + context.put("core", this.coreName); + Traversal traversal = new Traversal(); + context.put("traversal", traversal); + tupleStream.setStreamContext(context); + Map requestContext = req.getContext(); + requestContext.put("stream", new TimerStream(new ExceptionStream(tupleStream))); + requestContext.put("traversal", traversal); + } + + public String getDescription() { + return "StreamHandler"; + } + + public String getSource() { + return null; + } + + + public static class DummyErrorStream extends TupleStream { + private Exception e; + + public DummyErrorStream(Exception e) { + this.e = e; + } + public StreamComparator getStreamSort() { + return null; + } + + public void close() { + } + + public void open() { + } + + public Exception getException() { + return this.e; + } + + public void setStreamContext(StreamContext context) { + } + + public List children() { + return null; + } + + @Override + public Explanation toExplanation(StreamFactory factory) throws IOException { + return null; + } + + public Tuple read() { + String msg = e.getMessage(); + Map m = new HashMap(); + m.put("EOF", true); + m.put("EXCEPTION", msg); + return new Tuple(m); + } + } + + + private SolrParams adjustParams(SolrParams params) { + ModifiableSolrParams adjustedParams = new ModifiableSolrParams(); + adjustedParams.add(params); + adjustedParams.add(CommonParams.OMIT_HEADER, "true"); + return adjustedParams; + } + + public static class TimerStream extends TupleStream { + + private long begin; + private TupleStream tupleStream; + + public TimerStream(TupleStream tupleStream) { + this.tupleStream = tupleStream; + } + + public StreamComparator getStreamSort() { + return this.tupleStream.getStreamSort(); + } + + public void close() throws IOException { + this.tupleStream.close(); + } + + public void open() throws IOException { + this.begin = System.nanoTime(); + this.tupleStream.open(); + } + + public void setStreamContext(StreamContext context) { + this.tupleStream.setStreamContext(context); + } + + public List children() { + return this.tupleStream.children(); + } + + @Override + public Explanation toExplanation(StreamFactory factory) throws IOException { + return null; + } + + public Tuple read() throws IOException { + Tuple tuple = this.tupleStream.read(); + if(tuple.EOF) { + long totalTime = (System.nanoTime() - begin) / 1000000; + tuple.fields.put("RESPONSE_TIME", totalTime); + } + return tuple; + } + } + +} \ No newline at end of file diff --git a/solr/core/src/java/org/apache/solr/response/GraphMLResponseWriter.java b/solr/core/src/java/org/apache/solr/response/GraphMLResponseWriter.java new file mode 100644 index 00000000000..d941991c2b2 --- /dev/null +++ b/solr/core/src/java/org/apache/solr/response/GraphMLResponseWriter.java @@ -0,0 +1,167 @@ +package org.apache.solr.response; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.io.IOException; +import java.io.PrintWriter; +import java.io.Writer; +import java.lang.invoke.MethodHandles; +import java.util.Iterator; +import java.util.List; +import java.util.ArrayList; + +import org.apache.solr.client.solrj.io.graph.Traversal; +import org.apache.solr.client.solrj.io.stream.TupleStream; +import org.apache.solr.client.solrj.io.Tuple; +import org.apache.solr.common.util.NamedList; +import org.apache.solr.handler.GraphHandler; +import org.apache.solr.request.SolrQueryRequest; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class GraphMLResponseWriter implements QueryResponseWriter { + + private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + public void init(NamedList args) { + /* NOOP */ + } + + public String getContentType(SolrQueryRequest req, SolrQueryResponse res) { + return "application/xml"; + } + + public void write(Writer writer, SolrQueryRequest req, SolrQueryResponse res) throws IOException { + + Exception e1 = res.getException(); + if(e1 != null) { + e1.printStackTrace(new PrintWriter(writer)); + return; + } + + TupleStream stream = (TupleStream)req.getContext().get("stream"); + + if(stream instanceof GraphHandler.DummyErrorStream) { + GraphHandler.DummyErrorStream d = (GraphHandler.DummyErrorStream)stream; + Exception e = d.getException(); + e.printStackTrace(new PrintWriter(writer)); + return; + } + + + Traversal traversal = (Traversal)req.getContext().get("traversal"); + PrintWriter printWriter = new PrintWriter(writer); + + try { + + stream.open(); + + Tuple tuple = null; + + int edgeCount = 0; + + printWriter.println(""); + printWriter.println(""); + + printWriter.println(""); + + while (true) { + //Output the graph + tuple = stream.read(); + if (tuple.EOF) { + break; + } + + String id = tuple.getString("node"); + + if (traversal.isMultiCollection()) { + id = tuple.getString("collection") + "." + id; + } + + writer.write(" outfields = new ArrayList(); + Iterator keys = tuple.fields.keySet().iterator(); + while(keys.hasNext()) { + String key = keys.next(); + if(key.equals("node") || key.equals("ancestors") || key.equals("collection")) { + continue; + } else { + outfields.add(key); + } + } + + if (outfields.size() > 0) { + printWriter.println(">"); + for (String nodeAttribute : outfields) { + Object o = tuple.get(nodeAttribute); + if (o != null) { + printWriter.println("" + o.toString() + ""); + } + } + printWriter.println(""); + } else { + printWriter.println("/>"); + } + + List ancestors = tuple.getStrings("ancestors"); + + if(ancestors != null) { + for (String ancestor : ancestors) { + ++edgeCount; + writer.write(""); + } + } + } + + writer.write(""); + } finally { + stream.close(); + } + } + + private String replace(String s) { + if(s.indexOf(">") > -1) { + s = s.replace(">", ">"); + } + + if(s.indexOf("<") > -1) { + s = s.replace("<", "<"); + } + + if(s.indexOf("\"")> -1) { + s = s.replace("\"", """); + } + + if(s.indexOf("'") > -1) { + s = s.replace("'", "'"); + } + + if(s.indexOf("&") > -1) { + s = s.replace("&", "&"); + } + + return s; + } +} \ No newline at end of file diff --git a/solr/core/src/resources/ImplicitPlugins.json b/solr/core/src/resources/ImplicitPlugins.json index 8c0549a2573..325bf913cee 100644 --- a/solr/core/src/resources/ImplicitPlugins.json +++ b/solr/core/src/resources/ImplicitPlugins.json @@ -84,6 +84,13 @@ "distrib": false } }, + "/graph": { + "class": "solr.GraphHandler", + "invariants": { + "wt": "graphml", + "distrib": false + } + }, "/stream": { "class": "solr.StreamHandler", "invariants": { diff --git a/solr/core/src/test/org/apache/solr/response/TestGraphMLResponseWriter.java b/solr/core/src/test/org/apache/solr/response/TestGraphMLResponseWriter.java new file mode 100644 index 00000000000..283f64d3cac --- /dev/null +++ b/solr/core/src/test/org/apache/solr/response/TestGraphMLResponseWriter.java @@ -0,0 +1,155 @@ +package org.apache.solr.response; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.io.StringWriter; +import java.util.Map; +import java.util.HashMap; +import java.util.List; +import java.util.ArrayList; +import java.util.Iterator; + +import org.apache.solr.SolrTestCaseJ4; +import org.apache.solr.client.solrj.io.comp.StreamComparator; +import org.apache.solr.client.solrj.io.graph.Traversal; +import org.apache.solr.client.solrj.io.stream.TupleStream; +import org.apache.solr.client.solrj.io.stream.StreamContext; +import org.apache.solr.client.solrj.io.stream.expr.StreamFactory; +import org.apache.solr.client.solrj.io.Tuple; +import org.apache.solr.client.solrj.io.stream.expr.Explanation; +import org.apache.solr.request.SolrQueryRequest; +import org.junit.BeforeClass; +import org.junit.Test; + +public class TestGraphMLResponseWriter extends SolrTestCaseJ4 { + @BeforeClass + public static void beforeClass() throws Exception { + System.setProperty("enable.update.log", "false"); // schema12 doesn't support _version_ + initCore("solrconfig.xml","schema12.xml"); + } + + @Test + public void testGraphMLOutput() throws Exception { + SolrQueryRequest request = req("blah", "blah"); // Just need a request to attach the stream and traversal to. + SolrQueryResponse response = new SolrQueryResponse(); + Map context = request.getContext(); + TupleStream stream = new TestStream(); //Simulates a GatherNodesStream + Traversal traversal = new Traversal(); + context.put("traversal", traversal); + context.put("stream", stream); + StringWriter writer = new StringWriter(); + + GraphMLResponseWriter graphMLResponseWriter = new GraphMLResponseWriter(); + graphMLResponseWriter.write(writer, request, response); + String graphML = writer.toString(); + + //Validate the nodes + String error = h.validateXPath(graphML, + "//graph/node[1][@id ='bill']", + "//graph/node[2][@id ='jim']", + "//graph/node[3][@id ='max']"); + if(error != null) { + throw new Exception(error); + } + //Validate the edges + error = h.validateXPath(graphML, + "//graph/edge[1][@source ='jim']", + "//graph/edge[1][@target ='bill']", + "//graph/edge[2][@source ='max']", + "//graph/edge[2][@target ='bill']", + "//graph/edge[3][@source ='max']", + "//graph/edge[3][@target ='jim']", + "//graph/edge[4][@source ='jim']", + "//graph/edge[4][@target ='max']" + ); + + if(error != null) { + throw new Exception(error); + } + + } + + private class TestStream extends TupleStream { + + private Iterator tuples; + + public TestStream() { + //Create some nodes + List testTuples = new ArrayList(); + Map m1 = new HashMap(); + + List an1 = new ArrayList(); + an1.add("jim"); + an1.add("max"); + m1.put("node", "bill"); + m1.put("ancestors", an1); + testTuples.add(new Tuple(m1)); + + Map m2 = new HashMap(); + List an2 = new ArrayList(); + an2.add("max"); + m2.put("node", "jim"); + m2.put("ancestors", an2); + testTuples.add(new Tuple(m2)); + + Map m3 = new HashMap(); + List an3 = new ArrayList(); + an3.add("jim"); + m3.put("node", "max"); + m3.put("ancestors", an3); + testTuples.add(new Tuple(m3)); + + tuples = testTuples.iterator(); + } + + public StreamComparator getStreamSort() { + return null; + } + + public void close() { + + } + + public void open() { + + } + + public List children() { + return null; + } + + public Tuple read() { + if(tuples.hasNext()) { + return tuples.next(); + } else { + Map map = new HashMap(); + map.put("EOF", true); + return new Tuple(map); + } + } + + public void setStreamContext(StreamContext streamContext) { + + } + + public Explanation toExplanation(StreamFactory factory) { + return null; + } + + } +} diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/graph/GraphExpressionTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/graph/GraphExpressionTest.java index 53f71265910..c429fe806ab 100644 --- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/graph/GraphExpressionTest.java +++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/graph/GraphExpressionTest.java @@ -18,6 +18,8 @@ package org.apache.solr.client.solrj.io.graph; */ import java.io.IOException; +import java.io.InputStreamReader; +import java.io.InputStream; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -28,6 +30,10 @@ import java.util.Set; import org.apache.lucene.util.LuceneTestCase; import org.apache.lucene.util.LuceneTestCase.Slow; +import org.apache.solr.client.solrj.SolrRequest; +import org.apache.solr.client.solrj.embedded.JettySolrRunner; +import org.apache.solr.client.solrj.impl.HttpSolrClient; +import org.apache.solr.client.solrj.impl.InputStreamResponseParser; import org.apache.solr.client.solrj.io.SolrClientCache; import org.apache.solr.client.solrj.io.Tuple; import org.apache.solr.client.solrj.io.comp.ComparatorOrder; @@ -43,9 +49,12 @@ import org.apache.solr.client.solrj.io.stream.metrics.MeanMetric; import org.apache.solr.client.solrj.io.stream.metrics.MinMetric; import org.apache.solr.client.solrj.io.stream.metrics.SumMetric; import org.apache.solr.client.solrj.request.CollectionAdminRequest; +import org.apache.solr.client.solrj.request.QueryRequest; import org.apache.solr.client.solrj.request.UpdateRequest; import org.apache.solr.cloud.AbstractDistribZkTestBase; import org.apache.solr.cloud.SolrCloudTestCase; +import org.apache.solr.common.params.ModifiableSolrParams; +import org.apache.solr.common.util.NamedList; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; @@ -266,8 +275,8 @@ public class GraphExpressionTest extends SolrCloudTestCase { .withFunctionName("max", MaxMetric.class); String expr = "gatherNodes(collection1, " + - "walk=\"product1->product_s\"," + - "gather=\"basket_s\")"; + "walk=\"product1->product_s\"," + + "gather=\"basket_s\")"; stream = (GatherNodesStream)factory.constructStream(expr); stream.setStreamContext(context); @@ -284,9 +293,9 @@ public class GraphExpressionTest extends SolrCloudTestCase { //Test maxDocFreq param String docFreqExpr = "gatherNodes(collection1, " + - "walk=\"product1, product7->product_s\"," + - "maxDocFreq=\"2\","+ - "gather=\"basket_s\")"; + "walk=\"product1, product7->product_s\"," + + "maxDocFreq=\"2\","+ + "gather=\"basket_s\")"; stream = (GatherNodesStream)factory.constructStream(docFreqExpr); stream.setStreamContext(context); @@ -299,9 +308,9 @@ public class GraphExpressionTest extends SolrCloudTestCase { String expr2 = "gatherNodes(collection1, " + - expr+","+ - "walk=\"node->basket_s\"," + - "gather=\"product_s\", count(*), avg(price_f), sum(price_f), min(price_f), max(price_f))"; + expr+","+ + "walk=\"node->basket_s\"," + + "gather=\"product_s\", count(*), avg(price_f), sum(price_f), min(price_f), max(price_f))"; stream = (GatherNodesStream)factory.constructStream(expr2); @@ -338,8 +347,8 @@ public class GraphExpressionTest extends SolrCloudTestCase { //Test list of root nodes expr = "gatherNodes(collection1, " + - "walk=\"product4, product7->product_s\"," + - "gather=\"basket_s\")"; + "walk=\"product4, product7->product_s\"," + + "gather=\"basket_s\")"; stream = (GatherNodesStream)factory.constructStream(expr); @@ -356,8 +365,8 @@ public class GraphExpressionTest extends SolrCloudTestCase { //Test with negative filter query expr = "gatherNodes(collection1, " + - "walk=\"product4, product7->product_s\"," + - "gather=\"basket_s\", fq=\"-basket_s:basket4\")"; + "walk=\"product4, product7->product_s\"," + + "gather=\"basket_s\", fq=\"-basket_s:basket4\")"; stream = (GatherNodesStream)factory.constructStream(expr); @@ -406,8 +415,8 @@ public class GraphExpressionTest extends SolrCloudTestCase { .withFunctionName("max", MaxMetric.class); String expr = "gatherNodes(collection1, " + - "walk=\"bill->from_s\"," + - "gather=\"to_s\")"; + "walk=\"bill->from_s\"," + + "gather=\"to_s\")"; stream = (GatherNodesStream)factory.constructStream(expr); stream.setStreamContext(context); @@ -423,9 +432,9 @@ public class GraphExpressionTest extends SolrCloudTestCase { //Test scatter branches, leaves and trackTraversal expr = "gatherNodes(collection1, " + - "walk=\"bill->from_s\"," + - "gather=\"to_s\","+ - "scatter=\"branches, leaves\", trackTraversal=\"true\")"; + "walk=\"bill->from_s\"," + + "gather=\"to_s\","+ + "scatter=\"branches, leaves\", trackTraversal=\"true\")"; stream = (GatherNodesStream)factory.constructStream(expr); context = new StreamContext(); @@ -461,9 +470,9 @@ public class GraphExpressionTest extends SolrCloudTestCase { // Test query root expr = "gatherNodes(collection1, " + - "search(collection1, q=\"message_t:jim\", fl=\"from_s\", sort=\"from_s asc\"),"+ - "walk=\"from_s->from_s\"," + - "gather=\"to_s\")"; + "search(collection1, q=\"message_t:jim\", fl=\"from_s\", sort=\"from_s asc\"),"+ + "walk=\"from_s->from_s\"," + + "gather=\"to_s\")"; stream = (GatherNodesStream)factory.constructStream(expr); context = new StreamContext(); @@ -482,9 +491,9 @@ public class GraphExpressionTest extends SolrCloudTestCase { // Test query root scatter branches expr = "gatherNodes(collection1, " + - "search(collection1, q=\"message_t:jim\", fl=\"from_s\", sort=\"from_s asc\"),"+ - "walk=\"from_s->from_s\"," + - "gather=\"to_s\", scatter=\"branches, leaves\")"; + "search(collection1, q=\"message_t:jim\", fl=\"from_s\", sort=\"from_s asc\"),"+ + "walk=\"from_s->from_s\"," + + "gather=\"to_s\", scatter=\"branches, leaves\")"; stream = (GatherNodesStream)factory.constructStream(expr); context = new StreamContext(); @@ -505,14 +514,14 @@ public class GraphExpressionTest extends SolrCloudTestCase { assertTrue(tuples.get(3).getLong("level").equals(new Long(1))); expr = "gatherNodes(collection1, " + - "search(collection1, q=\"message_t:jim\", fl=\"from_s\", sort=\"from_s asc\"),"+ - "walk=\"from_s->from_s\"," + - "gather=\"to_s\")"; + "search(collection1, q=\"message_t:jim\", fl=\"from_s\", sort=\"from_s asc\"),"+ + "walk=\"from_s->from_s\"," + + "gather=\"to_s\")"; String expr2 = "gatherNodes(collection1, " + - expr+","+ - "walk=\"node->from_s\"," + - "gather=\"to_s\")"; + expr+","+ + "walk=\"node->from_s\"," + + "gather=\"to_s\")"; stream = (GatherNodesStream)factory.constructStream(expr2); context = new StreamContext(); @@ -548,14 +557,14 @@ public class GraphExpressionTest extends SolrCloudTestCase { expr = "gatherNodes(collection1, " + - "search(collection1, q=\"message_t:jim\", fl=\"from_s\", sort=\"from_s asc\"),"+ - "walk=\"from_s->from_s\"," + - "gather=\"to_s\")"; + "search(collection1, q=\"message_t:jim\", fl=\"from_s\", sort=\"from_s asc\"),"+ + "walk=\"from_s->from_s\"," + + "gather=\"to_s\")"; expr2 = "gatherNodes(collection1, " + - expr+","+ - "walk=\"node->from_s\"," + - "gather=\"to_s\", scatter=\"branches, leaves\")"; + expr+","+ + "walk=\"node->from_s\"," + + "gather=\"to_s\", scatter=\"branches, leaves\")"; stream = (GatherNodesStream)factory.constructStream(expr2); context = new StreamContext(); @@ -589,14 +598,14 @@ public class GraphExpressionTest extends SolrCloudTestCase { .commit(cluster.getSolrClient(), COLLECTION); expr = "gatherNodes(collection1, " + - "search(collection1, q=\"message_t:jim\", fl=\"from_s\", sort=\"from_s asc\"),"+ - "walk=\"from_s->from_s\"," + - "gather=\"to_s\", trackTraversal=\"true\")"; + "search(collection1, q=\"message_t:jim\", fl=\"from_s\", sort=\"from_s asc\"),"+ + "walk=\"from_s->from_s\"," + + "gather=\"to_s\", trackTraversal=\"true\")"; expr2 = "gatherNodes(collection1, " + - expr+","+ - "walk=\"node->from_s\"," + - "gather=\"to_s\", scatter=\"branches, leaves\", trackTraversal=\"true\")"; + expr+","+ + "walk=\"node->from_s\"," + + "gather=\"to_s\", scatter=\"branches, leaves\", trackTraversal=\"true\")"; stream = (GatherNodesStream)factory.constructStream(expr2); context = new StreamContext(); @@ -634,6 +643,84 @@ public class GraphExpressionTest extends SolrCloudTestCase { } + @Test + public void testGraphHandler() throws Exception { + + + new UpdateRequest() + .add(id, "0", "from_s", "bill", "to_s", "jim", "message_t", "Hello jim") + .add(id, "1", "from_s", "bill", "to_s", "sam", "message_t", "Hello sam") + .add(id, "2", "from_s", "bill", "to_s", "max", "message_t", "Hello max") + .add(id, "3", "from_s", "max", "to_s", "kip", "message_t", "Hello kip") + .add(id, "4", "from_s", "sam", "to_s", "steve", "message_t", "Hello steve") + .add(id, "5", "from_s", "jim", "to_s", "ann", "message_t", "Hello steve") + .commit(cluster.getSolrClient(), COLLECTION); + + commit(); + + List runners = cluster.getJettySolrRunners(); + JettySolrRunner runner = runners.get(0); + String url = runner.getBaseUrl().toString(); + + HttpSolrClient client = new HttpSolrClient(url); + ModifiableSolrParams params = new ModifiableSolrParams(); + + + String expr = "sort(by=\"node asc\", gatherNodes(collection1, " + + "walk=\"bill->from_s\"," + + "trackTraversal=\"true\"," + + "gather=\"to_s\"))"; + + params.add("expr", expr); + QueryRequest query = new QueryRequest(params); + query.setPath("/collection1/graph"); + + query.setResponseParser(new InputStreamResponseParser("xml")); + query.setMethod(SolrRequest.METHOD.POST); + + NamedList genericResponse = client.request(query); + + + InputStream stream = (InputStream)genericResponse.get("stream"); + InputStreamReader reader = new InputStreamReader(stream, "UTF-8"); + String xml = readString(reader); + //Validate the nodes + String error = h.validateXPath(xml, + "//graph/node[1][@id ='jim']", + "//graph/node[2][@id ='max']", + "//graph/node[3][@id ='sam']"); + if(error != null) { + throw new Exception(error); + } + //Validate the edges + error = h.validateXPath(xml, + "//graph/edge[1][@source ='bill']", + "//graph/edge[1][@target ='jim']", + "//graph/edge[2][@source ='bill']", + "//graph/edge[2][@target ='max']", + "//graph/edge[3][@source ='bill']", + "//graph/edge[3][@target ='sam']"); + + if(error != null) { + throw new Exception(error); + } + + client.close(); + } + + private String readString(InputStreamReader reader) throws Exception{ + StringBuilder builder = new StringBuilder(); + int c = 0; + while((c = reader.read()) != -1) { + builder.append(((char)c)); + } + + return builder.toString(); + } + + + + protected List getTuples(TupleStream tupleStream) throws IOException { tupleStream.open(); List tuples = new ArrayList(); @@ -675,7 +762,9 @@ public class GraphExpressionTest extends SolrCloudTestCase { throw new Exception("Longs not equal:"+expected+" : "+actual); } + + return true; } -} +} \ No newline at end of file diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java index 9a0653a3930..f0b7b7b907d 100644 --- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java +++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamExpressionTest.java @@ -175,7 +175,6 @@ public class StreamExpressionTest extends SolrCloudTestCase { assert(tuples.size() == 3); assertOrder(tuples, 0, 3, 4); assertLong(tuples.get(1), "a_i", 3); - } @Test