This commit is contained in:
Karl Wright 2016-04-19 08:28:36 -04:00
commit 46062fbd86
28 changed files with 1853 additions and 116 deletions

View File

@ -61,7 +61,7 @@ Optimizations
multiple polygons and holes, with memory usage independent of multiple polygons and holes, with memory usage independent of
polygon complexity. (Karl Wright, Mike McCandless, Robert Muir) polygon complexity. (Karl Wright, Mike McCandless, Robert Muir)
* LUCENE-7159, LUCENE-7222: Speed up LatLonPoint polygon performance for complex * LUCENE-7159, LUCENE-7222, LUCENE-7229: Speed up LatLonPoint polygon performance for complex
polygons. (Robert Muir) polygons. (Robert Muir)
* LUCENE-7211: Reduce memory & GC for spatial RPT Intersects when the number of * LUCENE-7211: Reduce memory & GC for spatial RPT Intersects when the number of

View File

@ -48,9 +48,6 @@ public final class Polygon {
/** maximum longitude of this polygon's bounding box area */ /** maximum longitude of this polygon's bounding box area */
public final double maxLon; public final double maxLon;
// TODO: refactor to GeoUtils once LUCENE-7165 is complete
private static final double ENCODING_TOLERANCE = 1e-6;
// TODO: we could also compute the maximal inner bounding box, to make relations faster to compute? // TODO: we could also compute the maximal inner bounding box, to make relations faster to compute?
/** /**
@ -234,70 +231,79 @@ public final class Polygon {
return containsCount; return containsCount;
} }
private boolean crossesSlowly(double minLat, double maxLat, final double minLon, final double maxLon) { /** Returns true if the box crosses our polygon */
/* private boolean crossesSlowly(double minLat, double maxLat, double minLon, double maxLon) {
* Accurately compute (within restrictions of cartesian decimal degrees) whether a rectangle crosses a polygon // we compute line intersections of every polygon edge with every box line.
*/ // if we find one, return true.
final double[] boxLats = new double[] { minLat, minLat, maxLat, maxLat, minLat }; // for each box line (AB):
final double[] boxLons = new double[] { minLon, maxLon, maxLon, minLon, minLon }; // for each poly line (CD):
// intersects = orient(C,D,A) * orient(C,D,B) <= 0 && orient(A,B,C) * orient(A,B,D) <= 0
for (int i = 1; i < polyLons.length; i++) {
double cy = polyLats[i - 1];
double dy = polyLats[i];
double cx = polyLons[i - 1];
double dx = polyLons[i];
// computes the intersection point between each bbox edge and the polygon edge // optimization: see if the rectangle is outside of the "bounding box" of the polyline at all
for (int b=0; b<4; ++b) { // if not, don't waste our time trying more complicated stuff
double a1 = boxLats[b+1]-boxLats[b]; if ((cy < minLat && dy < minLat) ||
double b1 = boxLons[b]-boxLons[b+1]; (cy > maxLat && dy > maxLat) ||
double c1 = a1*boxLons[b+1] + b1*boxLats[b+1]; (cx < minLon && dx < minLon) ||
for (int p=0; p<polyLons.length-1; ++p) { (cx > maxLon && dx > maxLon)) {
double a2 = polyLats[p+1]-polyLats[p]; continue;
double b2 = polyLons[p]-polyLons[p+1];
// compute determinant
double d = a1*b2 - a2*b1;
if (d != 0) {
// lines are not parallel, check intersecting points
double c2 = a2*polyLons[p+1] + b2*polyLats[p+1];
double s = (1/d)*(b2*c1 - b1*c2);
// todo TOLERANCE SHOULD MATCH EVERYWHERE this is currently blocked by LUCENE-7165
double x00 = Math.min(boxLons[b], boxLons[b+1]) - ENCODING_TOLERANCE;
if (x00 > s) {
continue; // out of range
}
double x01 = Math.max(boxLons[b], boxLons[b+1]) + ENCODING_TOLERANCE;
if (x01 < s) {
continue; // out of range
}
double x10 = Math.min(polyLons[p], polyLons[p+1]) - ENCODING_TOLERANCE;
if (x10 > s) {
continue; // out of range
}
double x11 = Math.max(polyLons[p], polyLons[p+1]) + ENCODING_TOLERANCE;
if (x11 < s) {
continue; // out of range
} }
double t = (1/d)*(a1*c2 - a2*c1); // does box's top edge intersect polyline?
double y00 = Math.min(boxLats[b], boxLats[b+1]) - ENCODING_TOLERANCE; // ax = minLon, bx = maxLon, ay = maxLat, by = maxLat
if (y00 > t || (x00 == s && y00 == t)) { if (orient(cx, cy, dx, dy, minLon, maxLat) * orient(cx, cy, dx, dy, maxLon, maxLat) <= 0 &&
continue; // out of range or touching orient(minLon, maxLat, maxLon, maxLat, cx, cy) * orient(minLon, maxLat, maxLon, maxLat, dx, dy) <= 0) {
}
double y01 = Math.max(boxLats[b], boxLats[b+1]) + ENCODING_TOLERANCE;
if (y01 < t || (x01 == s && y01 == t)) {
continue; // out of range or touching
}
double y10 = Math.min(polyLats[p], polyLats[p+1]) - ENCODING_TOLERANCE;
if (y10 > t || (x10 == s && y10 == t)) {
continue; // out of range or touching
}
double y11 = Math.max(polyLats[p], polyLats[p+1]) + ENCODING_TOLERANCE;
if (y11 < t || (x11 == s && y11 == t)) {
continue; // out of range or touching
}
// if line segments are not touching and the intersection point is within the range of either segment
return true; return true;
} }
} // for each poly edge
} // for each bbox edge // does box's right edge intersect polyline?
// ax = maxLon, bx = maxLon, ay = maxLat, by = minLat
if (orient(cx, cy, dx, dy, maxLon, maxLat) * orient(cx, cy, dx, dy, maxLon, minLat) <= 0 &&
orient(maxLon, maxLat, maxLon, minLat, cx, cy) * orient(maxLon, maxLat, maxLon, minLat, dx, dy) <= 0) {
return true;
}
// does box's bottom edge intersect polyline?
// ax = maxLon, bx = minLon, ay = minLat, by = minLat
if (orient(cx, cy, dx, dy, maxLon, minLat) * orient(cx, cy, dx, dy, minLon, minLat) <= 0 &&
orient(maxLon, minLat, minLon, minLat, cx, cy) * orient(maxLon, minLat, minLon, minLat, dx, dy) <= 0) {
return true;
}
// does box's left edge intersect polyline?
// ax = minLon, bx = minLon, ay = minLat, by = maxLat
if (orient(cx, cy, dx, dy, minLon, minLat) * orient(cx, cy, dx, dy, minLon, maxLat) <= 0 &&
orient(minLon, minLat, minLon, maxLat, cx, cy) * orient(minLon, minLat, minLon, maxLat, dx, dy) <= 0) {
return true;
}
}
return false; return false;
} }
/**
* Returns a positive value if points a, b, and c are arranged in counter-clockwise order,
* negative value if clockwise, zero if collinear.
*/
// see the "Orient2D" method described here:
// http://www.cs.berkeley.edu/~jrs/meshpapers/robnotes.pdf
// https://www.cs.cmu.edu/~quake/robust.html
// Note that this one does not yet have the floating point tricks to be exact!
private static int orient(double ax, double ay, double bx, double by, double cx, double cy) {
double v1 = (bx - ax) * (cy - ay);
double v2 = (cx - ax) * (by - ay);
if (v1 > v2) {
return 1;
} else if (v1 < v2) {
return -1;
} else {
return 0;
}
}
/** Returns a copy of the internal latitude array */ /** Returns a copy of the internal latitude array */
public double[] getPolyLats() { public double[] getPolyLats() {
return polyLats.clone(); return polyLats.clone();

View File

@ -37,7 +37,7 @@ public class SloppyMath {
* specified in decimal degrees (latitude/longitude). This works correctly * specified in decimal degrees (latitude/longitude). This works correctly
* even if the dateline is between the two points. * even if the dateline is between the two points.
* <p> * <p>
* Error is at most 2E-1 (20cm) from the actual haversine distance, but is typically * Error is at most 4E-1 (40cm) from the actual haversine distance, but is typically
* much smaller for reasonable distances: around 1E-5 (0.01mm) for distances less than * much smaller for reasonable distances: around 1E-5 (0.01mm) for distances less than
* 1000km. * 1000km.
* *

View File

@ -33,7 +33,7 @@ public class TestSloppyMath extends LuceneTestCase {
// accuracy for asin() // accuracy for asin()
static double ASIN_DELTA = 1E-7; static double ASIN_DELTA = 1E-7;
// accuracy for haversinMeters() // accuracy for haversinMeters()
static double HAVERSIN_DELTA = 2E-1; static double HAVERSIN_DELTA = 38E-2;
// accuracy for haversinMeters() for "reasonable" distances (< 1000km) // accuracy for haversinMeters() for "reasonable" distances (< 1000km)
static double REASONABLE_HAVERSIN_DELTA = 1E-5; static double REASONABLE_HAVERSIN_DELTA = 1E-5;
@ -161,12 +161,29 @@ public class TestSloppyMath extends LuceneTestCase {
double lat2 = GeoTestUtil.nextLatitude(); double lat2 = GeoTestUtil.nextLatitude();
double lon2 = GeoTestUtil.nextLongitude(); double lon2 = GeoTestUtil.nextLongitude();
double expected = haversinMeters(lat1, lon1, lat2, lon2); double expected = slowHaversin(lat1, lon1, lat2, lon2);
double actual = slowHaversin(lat1, lon1, lat2, lon2); double actual = haversinMeters(lat1, lon1, lat2, lon2);
assertEquals(expected, actual, HAVERSIN_DELTA); assertEquals(expected, actual, HAVERSIN_DELTA);
} }
} }
/**
* Step across the whole world to find huge absolute errors.
* Don't rely on random number generator to pick these massive distances. */
public void testAcrossWholeWorldSteps() {
for (int lat1 = -90; lat1 <= 90; lat1 += 10) {
for (int lon1 = -180; lon1 <= 180; lon1 += 10) {
for (int lat2 = -90; lat2 <= 90; lat2 += 10) {
for (int lon2 = -180; lon2 <= 180; lon2 += 10) {
double expected = slowHaversin(lat1, lon1, lat2, lon2);
double actual = haversinMeters(lat1, lon1, lat2, lon2);
assertEquals(expected, actual, HAVERSIN_DELTA);
}
}
}
}
}
public void testAgainstSlowVersionReasonable() { public void testAgainstSlowVersionReasonable() {
for (int i = 0; i < 100_000; i++) { for (int i = 0; i < 100_000; i++) {
double lat1 = GeoTestUtil.nextLatitude(); double lat1 = GeoTestUtil.nextLatitude();

View File

@ -48,7 +48,7 @@ import static org.apache.lucene.geo.GeoEncodingUtils.decodeLongitude;
// relational operations as rectangle <-> rectangle relations in integer space in log(n) time.. // relational operations as rectangle <-> rectangle relations in integer space in log(n) time..
final class LatLonGrid { final class LatLonGrid {
// must be a power of two! // must be a power of two!
static final int GRID_SIZE = 1<<5; static final int GRID_SIZE = 1<<7;
final int minLat; final int minLat;
final int maxLat; final int maxLat;
final int minLon; final int minLon;

View File

@ -129,6 +129,9 @@ Bug Fixes
* SOLR-9004: Fix "name" field type definition in films example. (Alexandre Rafalovitch via Varun Thacker) * SOLR-9004: Fix "name" field type definition in films example. (Alexandre Rafalovitch via Varun Thacker)
* SOLR-8983: Cleanup clusterstate and replicas for a failed create collection request
(Varun Thacker, Anshum Gupta)
Optimizations Optimizations
---------------------- ----------------------
* SOLR-8722: Don't force a full ZkStateReader refresh on every Overseer operation. * SOLR-8722: Don't force a full ZkStateReader refresh on every Overseer operation.

View File

@ -1961,10 +1961,16 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
} }
processResponses(results, shardHandler, false, null, async, requestMap, Collections.emptySet()); processResponses(results, shardHandler, false, null, async, requestMap, Collections.emptySet());
if(results.get("failure") != null && ((SimpleOrderedMap)results.get("failure")).size() > 0) {
// Let's cleanup as we hit an exception
// We shouldn't be passing 'results' here for the cleanup as the response would then contain 'success'
// element, which may be interpreted by the user as a positive ack
cleanupCollection(collectionName, new NamedList());
log.info("Cleaned up artifacts for failed create collection for [" + collectionName + "]");
} else {
log.debug("Finished create command on all shards for collection: " log.debug("Finished create command on all shards for collection: "
+ collectionName); + collectionName);
}
} catch (SolrException ex) { } catch (SolrException ex) {
throw ex; throw ex;
} catch (Exception ex) { } catch (Exception ex) {
@ -1972,6 +1978,15 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
} }
} }
private void cleanupCollection(String collectionName, NamedList results) throws KeeperException, InterruptedException {
log.error("Cleaning up collection [" + collectionName + "]." );
Map<String, Object> props = makeMap(
Overseer.QUEUE_OPERATION, DELETE.toLower(),
NAME, collectionName);
deleteCollection(new ZkNodeProps(props), results);
}
private Map<Position, String> identifyNodes(ClusterState clusterState, private Map<Position, String> identifyNodes(ClusterState clusterState,
List<String> nodeList, List<String> nodeList,
ZkNodeProps message, ZkNodeProps message,

View File

@ -28,6 +28,7 @@ import java.util.Map.Entry;
import org.apache.solr.client.solrj.io.SolrClientCache; import org.apache.solr.client.solrj.io.SolrClientCache;
import org.apache.solr.client.solrj.io.Tuple; import org.apache.solr.client.solrj.io.Tuple;
import org.apache.solr.client.solrj.io.comp.StreamComparator; import org.apache.solr.client.solrj.io.comp.StreamComparator;
import org.apache.solr.client.solrj.io.graph.GatherNodesStream;
import org.apache.solr.client.solrj.io.graph.ShortestPathStream; import org.apache.solr.client.solrj.io.graph.ShortestPathStream;
import org.apache.solr.client.solrj.io.ops.ConcatOperation; import org.apache.solr.client.solrj.io.ops.ConcatOperation;
import org.apache.solr.client.solrj.io.ops.DistinctOperation; import org.apache.solr.client.solrj.io.ops.DistinctOperation;
@ -117,11 +118,10 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware,
.withFunctionName("outerHashJoin", OuterHashJoinStream.class) .withFunctionName("outerHashJoin", OuterHashJoinStream.class)
.withFunctionName("intersect", IntersectStream.class) .withFunctionName("intersect", IntersectStream.class)
.withFunctionName("complement", ComplementStream.class) .withFunctionName("complement", ComplementStream.class)
.withFunctionName("daemon", DaemonStream.class)
.withFunctionName("sort", SortStream.class) .withFunctionName("sort", SortStream.class)
.withFunctionName("daemon", DaemonStream.class)
// graph streams
.withFunctionName("shortestPath", ShortestPathStream.class) .withFunctionName("shortestPath", ShortestPathStream.class)
.withFunctionName("gatherNodes", GatherNodesStream.class)
// metrics // metrics
.withFunctionName("min", MinMetric.class) .withFunctionName("min", MinMetric.class)
@ -274,6 +274,14 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware,
public Tuple read() { public Tuple read() {
String msg = e.getMessage(); String msg = e.getMessage();
Throwable t = e.getCause();
while(t != null) {
msg = t.getMessage();
t = t.getCause();
}
Map m = new HashMap(); Map m = new HashMap();
m.put("EOF", true); m.put("EOF", true);
m.put("EXCEPTION", msg); m.put("EXCEPTION", msg);

View File

@ -77,7 +77,7 @@ import org.apache.solr.util.plugin.SolrCoreAware;
* In the ExampleFieldMutatingUpdateProcessorFactory configured below, * In the ExampleFieldMutatingUpdateProcessorFactory configured below,
* fields will be mutated if the name starts with "foo" <i>or</i> "bar"; * fields will be mutated if the name starts with "foo" <i>or</i> "bar";
* <b>unless</b> the field name contains the substring "SKIP" <i>or</i> * <b>unless</b> the field name contains the substring "SKIP" <i>or</i>
* the fieldType is (or subclasses) DateField. Meaning a field named * the fieldType is (or subclasses) TrieDateField. Meaning a field named
* "foo_SKIP" is guaranteed not to be selected, but a field named "bar_smith" * "foo_SKIP" is guaranteed not to be selected, but a field named "bar_smith"
* that uses StrField will be selected. * that uses StrField will be selected.
* </p> * </p>

View File

@ -26,8 +26,19 @@ import java.lang.management.ManagementFactory;
import java.nio.file.Files; import java.nio.file.Files;
import java.nio.file.Path; import java.nio.file.Path;
import java.nio.file.Paths; import java.nio.file.Paths;
import java.util.*; import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.apache.lucene.util.LuceneTestCase.Slow; import org.apache.lucene.util.LuceneTestCase.Slow;
@ -39,7 +50,6 @@ import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.embedded.JettySolrRunner; import org.apache.solr.client.solrj.embedded.JettySolrRunner;
import org.apache.solr.client.solrj.impl.CloudSolrClient; import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.impl.HttpSolrClient; import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.client.solrj.impl.HttpSolrClient.RemoteSolrException;
import org.apache.solr.client.solrj.request.CollectionAdminRequest; import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.request.CoreAdminRequest; import org.apache.solr.client.solrj.request.CoreAdminRequest;
import org.apache.solr.client.solrj.request.CoreAdminRequest.Create; import org.apache.solr.client.solrj.request.CoreAdminRequest.Create;
@ -375,6 +385,14 @@ public class CollectionsAPIDistributedZkTest extends AbstractFullDistribZkTestBa
} }
private NamedList<Object> makeRequest(String baseUrl, SolrRequest request, int socketTimeout)
throws SolrServerException, IOException {
try (SolrClient client = createNewSolrClient("", baseUrl)) {
((HttpSolrClient) client).setSoTimeout(socketTimeout);
return client.request(request);
}
}
private NamedList<Object> makeRequest(String baseUrl, SolrRequest request) private NamedList<Object> makeRequest(String baseUrl, SolrRequest request)
throws SolrServerException, IOException { throws SolrServerException, IOException {
try (SolrClient client = createNewSolrClient("", baseUrl)) { try (SolrClient client = createNewSolrClient("", baseUrl)) {
@ -525,8 +543,7 @@ public class CollectionsAPIDistributedZkTest extends AbstractFullDistribZkTestBa
params.set(OverseerCollectionMessageHandler.CREATE_NODE_SET, nn1 + "," + nn2); params.set(OverseerCollectionMessageHandler.CREATE_NODE_SET, nn1 + "," + nn2);
request = new QueryRequest(params); request = new QueryRequest(params);
request.setPath("/admin/collections"); request.setPath("/admin/collections");
gotExp = false; NamedList<Object> resp = makeRequest(baseUrl, request, 60000);
NamedList<Object> resp = makeRequest(baseUrl, request);;
SimpleOrderedMap success = (SimpleOrderedMap) resp.get("success"); SimpleOrderedMap success = (SimpleOrderedMap) resp.get("success");
SimpleOrderedMap failure = (SimpleOrderedMap) resp.get("failure"); SimpleOrderedMap failure = (SimpleOrderedMap) resp.get("failure");

View File

@ -0,0 +1,84 @@
package org.apache.solr.cloud;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.util.ArrayList;
import java.util.Properties;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.response.CollectionAdminResponse;
import org.apache.solr.common.params.CoreAdminParams;
import org.junit.BeforeClass;
import org.junit.Test;
public class CreateCollectionCleanupTest extends SolrCloudTestCase {
protected static final String CLOUD_SOLR_XML_WITH_10S_CREATE_COLL_WAIT = "<solr>\n" +
"\n" +
" <str name=\"shareSchema\">${shareSchema:false}</str>\n" +
" <str name=\"configSetBaseDir\">${configSetBaseDir:configsets}</str>\n" +
" <str name=\"coreRootDirectory\">${coreRootDirectory:.}</str>\n" +
"\n" +
" <shardHandlerFactory name=\"shardHandlerFactory\" class=\"HttpShardHandlerFactory\">\n" +
" <str name=\"urlScheme\">${urlScheme:}</str>\n" +
" <int name=\"socketTimeout\">${socketTimeout:90000}</int>\n" +
" <int name=\"connTimeout\">${connTimeout:15000}</int>\n" +
" </shardHandlerFactory>\n" +
"\n" +
" <solrcloud>\n" +
" <str name=\"host\">127.0.0.1</str>\n" +
" <int name=\"hostPort\">${hostPort:8983}</int>\n" +
" <str name=\"hostContext\">${hostContext:solr}</str>\n" +
" <int name=\"zkClientTimeout\">${solr.zkclienttimeout:30000}</int>\n" +
" <bool name=\"genericCoreNodeNames\">${genericCoreNodeNames:true}</bool>\n" +
" <int name=\"leaderVoteWait\">10000</int>\n" +
" <int name=\"distribUpdateConnTimeout\">${distribUpdateConnTimeout:45000}</int>\n" +
" <int name=\"distribUpdateSoTimeout\">${distribUpdateSoTimeout:340000}</int>\n" +
" <int name=\"createCollectionWaitTimeTillActive\">${createCollectionWaitTimeTillActive:10}</int>\n" +
" </solrcloud>\n" +
" \n" +
"</solr>\n";
@BeforeClass
public static void createCluster() throws Exception {
configureCluster(1)
.addConfig("conf1", TEST_PATH().resolve("configsets").resolve("cloud-minimal").resolve("conf"))
.withSolrXml(CLOUD_SOLR_XML_WITH_10S_CREATE_COLL_WAIT)
.configure();
}
@Test
public void testCreateCollectionCleanup() throws Exception {
final CloudSolrClient cloudClient = cluster.getSolrClient();
// Create a collection that would fail
CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection("foo","conf1",1,1);
Properties properties = new Properties();
properties.put(CoreAdminParams.DATA_DIR, "/some_invalid_dir/foo");
create.setProperties(properties);
CollectionAdminResponse rsp = create.process(cloudClient);
assertFalse(rsp.isSuccess());
// Confirm using LIST that the collection does not exist
CollectionAdminRequest.List list = CollectionAdminRequest.listCollections();
rsp = list.process(cloudClient);
assertFalse(((ArrayList) rsp.getResponse().get("collections")).contains("foo"));
}
}

View File

@ -20,7 +20,6 @@ import java.lang.invoke.MethodHandles;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Random;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -28,17 +27,15 @@ import java.util.concurrent.TimeUnit;
import org.apache.lucene.util.LuceneTestCase.Slow; import org.apache.lucene.util.LuceneTestCase.Slow;
import org.apache.lucene.util.TestUtil; import org.apache.lucene.util.TestUtil;
import org.apache.solr.cloud.SolrCloudTestCase; import org.apache.solr.cloud.SolrCloudTestCase;
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
import org.apache.solr.client.solrj.impl.CloudSolrClient; import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.common.cloud.SolrZkClient; import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkStateReader; import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.util.ExecutorUtil; import org.apache.solr.common.util.ExecutorUtil;
import org.apache.solr.core.CloudConfig.CloudConfigBuilder;
import org.apache.solr.util.DefaultSolrThreadFactory; import org.apache.solr.util.DefaultSolrThreadFactory;
import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.junit.AfterClass;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -81,6 +78,12 @@ public class TestStressLiveNodes extends SolrCloudTestCase {
} }
@AfterClass
private static void afterClass() throws Exception {
CLOUD_CLIENT.close();
CLOUD_CLIENT = null;
}
private static SolrZkClient newSolrZkClient() { private static SolrZkClient newSolrZkClient() {
assertNotNull(ZK_SERVER_ADDR); assertNotNull(ZK_SERVER_ADDR);
// WTF is CloudConfigBuilder.DEFAULT_ZK_CLIENT_TIMEOUT private? // WTF is CloudConfigBuilder.DEFAULT_ZK_CLIENT_TIMEOUT private?

View File

@ -2,15 +2,12 @@ package org.apache.solr.core;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path; import java.nio.file.Path;
import java.util.HashMap; import java.util.HashMap;
import java.util.Set; import java.util.Set;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.solr.client.solrj.SolrQuery; import org.apache.solr.client.solrj.SolrQuery;
import org.apache.solr.client.solrj.SolrServerException; import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.impl.CloudSolrClient; import org.apache.solr.client.solrj.impl.CloudSolrClient;
@ -19,6 +16,8 @@ import org.apache.solr.cloud.SolrCloudTestCase;
import org.apache.solr.cloud.ZkTestServer; import org.apache.solr.cloud.ZkTestServer;
import org.apache.solr.common.SolrDocumentList; import org.apache.solr.common.SolrDocumentList;
import org.apache.solr.common.SolrInputDocument; import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.handler.TestBlobHandler;
import org.apache.zookeeper.server.DataNode; import org.apache.zookeeper.server.DataNode;
import org.apache.zookeeper.server.DataTree; import org.apache.zookeeper.server.DataTree;
import org.apache.zookeeper.server.ZKDatabase; import org.apache.zookeeper.server.ZKDatabase;
@ -54,9 +53,8 @@ public class BlobRepositoryCloudTest extends SolrCloudTestCase {
// Thread.sleep(2000); // Thread.sleep(2000);
HashMap<String, String> params = new HashMap<>(); HashMap<String, String> params = new HashMap<>();
cluster.createCollection(".system", 1, 1, null, params); cluster.createCollection(".system", 1, 1, null, params);
// Thread.sleep(2000);
// test component will fail if it cant' find a blob with this data by this name // test component will fail if it cant' find a blob with this data by this name
postBlob("testResource", "foo,bar\nbaz,bam"); TestBlobHandler.postData(cluster.getSolrClient(), findLiveNodeURI(), "testResource", ByteBuffer.wrap("foo,bar\nbaz,bam".getBytes(StandardCharsets.UTF_8)));
// Thread.sleep(2000); // Thread.sleep(2000);
// if these don't load we probably failed to post the blob above // if these don't load we probably failed to post the blob above
cluster.createCollection("col1", 1, 1, "configname", params); cluster.createCollection("col1", 1, 1, "configname", params);
@ -95,28 +93,10 @@ public class BlobRepositoryCloudTest extends SolrCloudTestCase {
assertLastQueryToCollection("col2"); assertLastQueryToCollection("col2");
} }
// TODO: move this up to parent class? Probably accepting entity, or with alternative signatures
private static void postBlob(String name, String string) throws IOException {
HttpPost post = new HttpPost(findLiveNodeURI() + "/.system/blob/" + name);
StringEntity csv = new StringEntity(string, ContentType.create("application/octet-stream"));
post.setEntity(csv);
try (CloseableHttpClient httpclient = HttpClients.createDefault()) {
httpclient.execute(post);
}
}
// TODO: move this up to parent class? // TODO: move this up to parent class?
private static String findLiveNodeURI() { private static String findLiveNodeURI() {
ZkTestServer zkServer = cluster.getZkServer(); ZkStateReader zkStateReader = cluster.getSolrClient().getZkStateReader();
ZKDatabase zkDatabase = zkServer.getZKDatabase(); return zkStateReader.getBaseUrlForNodeName(zkStateReader.getClusterState().getCollection(".system").getSlices().iterator().next().getLeader().getNodeName());
DataTree dataTree = zkDatabase.getDataTree();
DataNode node = dataTree.getNode("/solr/live_nodes");
Set<String> children = node.getChildren();
String liveNode = children.iterator().next();
String[] split = liveNode.split("_");
String host = split[0];
String name = split[1];
return "http://" + host + "/" + name;
} }
private void assertLastQueryToCollection(String collection) throws SolrServerException, IOException { private void assertLastQueryToCollection(String collection) throws SolrServerException, IOException {

View File

@ -0,0 +1,580 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.client.solrj.io.graph;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.HashSet;
import java.util.ArrayList;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import org.apache.solr.client.solrj.io.eq.MultipleFieldEqualitor;
import org.apache.solr.client.solrj.io.stream.*;
import org.apache.solr.client.solrj.io.stream.metrics.*;
import org.apache.solr.client.solrj.io.Tuple;
import org.apache.solr.client.solrj.io.comp.StreamComparator;
import org.apache.solr.client.solrj.io.eq.FieldEqualitor;
import org.apache.solr.client.solrj.io.stream.expr.Expressible;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParameter;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParameter;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionValue;
import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
import org.apache.solr.common.util.ExecutorUtil;
import org.apache.solr.common.util.SolrjNamedThreadFactory;
public class GatherNodesStream extends TupleStream implements Expressible {
private String zkHost;
private String collection;
private StreamContext streamContext;
private Map queryParams;
private String traverseFrom;
private String traverseTo;
private String gather;
private boolean trackTraversal;
private boolean useDefaultTraversal;
private TupleStream tupleStream;
private Set<Traversal.Scatter> scatter;
private Iterator<Tuple> out;
private Traversal traversal;
private List<Metric> metrics;
public GatherNodesStream(String zkHost,
String collection,
TupleStream tupleStream,
String traverseFrom,
String traverseTo,
String gather,
Map queryParams,
List<Metric> metrics,
boolean trackTraversal,
Set<Traversal.Scatter> scatter) {
init(zkHost,
collection,
tupleStream,
traverseFrom,
traverseTo,
gather,
queryParams,
metrics,
trackTraversal,
scatter);
}
public GatherNodesStream(StreamExpression expression, StreamFactory factory) throws IOException {
String collectionName = factory.getValueOperand(expression, 0);
List<StreamExpressionNamedParameter> namedParams = factory.getNamedOperands(expression);
StreamExpressionNamedParameter zkHostExpression = factory.getNamedOperand(expression, "zkHost");
List<StreamExpression> streamExpressions = factory.getExpressionOperandsRepresentingTypes(expression, Expressible.class, TupleStream.class);
// Collection Name
if(null == collectionName) {
throw new IOException(String.format(Locale.ROOT,"invalid expression %s - collectionName expected as first operand",expression));
}
Set<Traversal.Scatter> scatter = new HashSet();
StreamExpressionNamedParameter scatterExpression = factory.getNamedOperand(expression, "scatter");
if(scatterExpression == null) {
scatter.add(Traversal.Scatter.LEAVES);
} else {
String s = ((StreamExpressionValue)scatterExpression.getParameter()).getValue();
String[] sArray = s.split(",");
for(String sv : sArray) {
sv = sv.trim();
if(Traversal.Scatter.BRANCHES.toString().equalsIgnoreCase(sv)) {
scatter.add(Traversal.Scatter.BRANCHES);
} else if (Traversal.Scatter.LEAVES.toString().equalsIgnoreCase(sv)) {
scatter.add(Traversal.Scatter.LEAVES);
}
}
}
String gather = null;
StreamExpressionNamedParameter gatherExpression = factory.getNamedOperand(expression, "gather");
if(gatherExpression == null) {
throw new IOException(String.format(Locale.ROOT,"invalid expression %s - from param is required",expression));
} else {
gather = ((StreamExpressionValue)gatherExpression.getParameter()).getValue();
}
String traverseFrom = null;
String traverseTo = null;
StreamExpressionNamedParameter edgeExpression = factory.getNamedOperand(expression, "walk");
TupleStream stream = null;
if(edgeExpression == null) {
throw new IOException(String.format(Locale.ROOT,"invalid expression %s - walk param is required", expression));
} else {
if(streamExpressions.size() > 0) {
stream = factory.constructStream(streamExpressions.get(0));
String edge = ((StreamExpressionValue) edgeExpression.getParameter()).getValue();
String[] fields = edge.split("->");
if (fields.length != 2) {
throw new IOException(String.format(Locale.ROOT, "invalid expression %s - walk param separated by an -> and must contain two fields", expression));
}
traverseFrom = fields[0].trim();
traverseTo = fields[1].trim();
} else {
String edge = ((StreamExpressionValue) edgeExpression.getParameter()).getValue();
String[] fields = edge.split("->");
if (fields.length != 2) {
throw new IOException(String.format(Locale.ROOT, "invalid expression %s - walk param separated by an -> and must contain two fields", expression));
}
String[] rootNodes = fields[0].split(",");
List<String> l = new ArrayList();
for(String n : rootNodes) {
l.add(n.trim());
}
stream = new NodeStream(l);
traverseFrom = "node";
traverseTo = fields[1].trim();
}
}
List<StreamExpression> metricExpressions = factory.getExpressionOperandsRepresentingTypes(expression, Expressible.class, Metric.class);
List<Metric> metrics = new ArrayList();
for(int idx = 0; idx < metricExpressions.size(); ++idx){
metrics.add(factory.constructMetric(metricExpressions.get(idx)));
}
boolean trackTraversal = false;
StreamExpressionNamedParameter trackExpression = factory.getNamedOperand(expression, "trackTraversal");
if(trackExpression != null) {
trackTraversal = Boolean.parseBoolean(((StreamExpressionValue) trackExpression.getParameter()).getValue());
} else {
useDefaultTraversal = true;
}
StreamExpressionNamedParameter scopeExpression = factory.getNamedOperand(expression, "localScope");
if(trackExpression != null) {
trackTraversal = Boolean.parseBoolean(((StreamExpressionValue) trackExpression.getParameter()).getValue());
}
Map<String,String> params = new HashMap<String,String>();
for(StreamExpressionNamedParameter namedParam : namedParams){
if(!namedParam.getName().equals("zkHost") &&
!namedParam.getName().equals("gather") &&
!namedParam.getName().equals("walk") &&
!namedParam.getName().equals("scatter") &&
!namedParam.getName().equals("trackTraversal"))
{
params.put(namedParam.getName(), namedParam.getParameter().toString().trim());
}
}
// zkHost, optional - if not provided then will look into factory list to get
String zkHost = null;
if(null == zkHostExpression){
zkHost = factory.getCollectionZkHost(collectionName);
if(zkHost == null) {
zkHost = factory.getDefaultZkHost();
}
} else if(zkHostExpression.getParameter() instanceof StreamExpressionValue) {
zkHost = ((StreamExpressionValue)zkHostExpression.getParameter()).getValue();
}
if(null == zkHost){
throw new IOException(String.format(Locale.ROOT,"invalid expression %s - zkHost not found for collection '%s'",expression,collectionName));
}
// We've got all the required items
init(zkHost,
collectionName,
stream,
traverseFrom,
traverseTo ,
gather,
params,
metrics,
trackTraversal,
scatter);
}
private void init(String zkHost,
String collection,
TupleStream tupleStream,
String traverseFrom,
String traverseTo,
String gather,
Map queryParams,
List<Metric> metrics,
boolean trackTraversal,
Set<Traversal.Scatter> scatter) {
this.zkHost = zkHost;
this.collection = collection;
this.tupleStream = tupleStream;
this.traverseFrom = traverseFrom;
this.traverseTo = traverseTo;
this.gather = gather;
this.queryParams = queryParams;
this.metrics = metrics;
this.trackTraversal = trackTraversal;
this.scatter = scatter;
}
@Override
public StreamExpressionParameter toExpression(StreamFactory factory) throws IOException {
StreamExpression expression = new StreamExpression(factory.getFunctionName(this.getClass()));
// collection
expression.addParameter(collection);
if(tupleStream instanceof Expressible){
expression.addParameter(((Expressible)tupleStream).toExpression(factory));
}
else{
throw new IOException("This GatherNodesStream contains a non-expressible TupleStream - it cannot be converted to an expression");
}
Set<Map.Entry> entries = queryParams.entrySet();
// parameters
for(Map.Entry param : entries){
String value = param.getValue().toString();
// SOLR-8409: This is a special case where the params contain a " character
// Do note that in any other BASE streams with parameters where a " might come into play
// that this same replacement needs to take place.
value = value.replace("\"", "\\\"");
expression.addParameter(new StreamExpressionNamedParameter(param.getKey().toString(), value));
}
if(metrics != null) {
for (Metric metric : metrics) {
expression.addParameter(metric.toExpression(factory));
}
}
expression.addParameter(new StreamExpressionNamedParameter("zkHost", zkHost));
expression.addParameter(new StreamExpressionNamedParameter("gather", zkHost));
expression.addParameter(new StreamExpressionNamedParameter("walk", traverseFrom+"->"+traverseTo));
expression.addParameter(new StreamExpressionNamedParameter("trackTraversal", Boolean.toString(trackTraversal)));
StringBuilder buf = new StringBuilder();
for(Traversal.Scatter sc : scatter) {
if(buf.length() > 0 ) {
buf.append(",");
}
buf.append(sc.toString());
}
expression.addParameter(new StreamExpressionNamedParameter("scatter", buf.toString()));
return expression;
}
public void setStreamContext(StreamContext context) {
this.traversal = (Traversal) context.get("traversal");
if (traversal == null) {
//No traversal in the context. So create a new context and a new traversal.
//This ensures that two separate traversals in the same expression don't pollute each others traversal.
StreamContext localContext = new StreamContext();
localContext.numWorkers = context.numWorkers;
localContext.workerID = context.workerID;
localContext.setSolrClientCache(context.getSolrClientCache());
localContext.setStreamFactory(context.getStreamFactory());
for(Object key :context.getEntries().keySet()) {
localContext.put(key, context.get(key));
}
traversal = new Traversal();
localContext.put("traversal", traversal);
this.tupleStream.setStreamContext(localContext);
this.streamContext = localContext;
} else {
this.tupleStream.setStreamContext(context);
this.streamContext = context;
}
}
public List<TupleStream> children() {
List<TupleStream> l = new ArrayList();
l.add(tupleStream);
return l;
}
public void open() throws IOException {
tupleStream.open();
}
private class JoinRunner implements Callable<List<Tuple>> {
private List<String> nodes;
private List<Tuple> edges = new ArrayList();
public JoinRunner(List<String> nodes) {
this.nodes = nodes;
}
public List<Tuple> call() {
Map joinParams = new HashMap();
Set<String> flSet = new HashSet();
flSet.add(gather);
flSet.add(traverseTo);
//Add the metric columns
if(metrics != null) {
for(Metric metric : metrics) {
for(String column : metric.getColumns()) {
flSet.add(column);
}
}
}
if(queryParams.containsKey("fl")) {
String flString = (String)queryParams.get("fl");
String[] flArray = flString.split(",");
for(String f : flArray) {
flSet.add(f.trim());
}
}
Iterator<String> it = flSet.iterator();
StringBuilder buf = new StringBuilder();
while(it.hasNext()) {
buf.append(it.next());
if(it.hasNext()) {
buf.append(",");
}
}
joinParams.putAll(queryParams);
joinParams.put("fl", buf.toString());
joinParams.put("qt", "/export");
joinParams.put("sort", gather + " asc,"+traverseTo +" asc");
StringBuffer nodeQuery = new StringBuffer();
for(String node : nodes) {
nodeQuery.append(node).append(" ");
}
String q = traverseTo + ":(" + nodeQuery.toString().trim() + ")";
joinParams.put("q", q);
TupleStream stream = null;
try {
stream = new UniqueStream(new CloudSolrStream(zkHost, collection, joinParams), new MultipleFieldEqualitor(new FieldEqualitor(gather), new FieldEqualitor(traverseTo)));
stream.setStreamContext(streamContext);
stream.open();
BATCH:
while (true) {
Tuple tuple = stream.read();
if (tuple.EOF) {
break BATCH;
}
edges.add(tuple);
}
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
try {
stream.close();
} catch(Exception ce) {
throw new RuntimeException(ce);
}
}
return edges;
}
}
public void close() throws IOException {
tupleStream.close();
}
public Tuple read() throws IOException {
if (out == null) {
List<String> joinBatch = new ArrayList();
List<Future<List<Tuple>>> futures = new ArrayList();
Map<String, Node> level = new HashMap();
ExecutorService threadPool = null;
try {
threadPool = ExecutorUtil.newMDCAwareFixedThreadPool(4, new SolrjNamedThreadFactory("GatherNodesStream"));
Map<String, Node> roots = new HashMap();
while (true) {
Tuple tuple = tupleStream.read();
if (tuple.EOF) {
if (joinBatch.size() > 0) {
JoinRunner joinRunner = new JoinRunner(joinBatch);
Future future = threadPool.submit(joinRunner);
futures.add(future);
}
break;
}
String value = tuple.getString(traverseFrom);
if(traversal.getDepth() == 0) {
//This gathers the root nodes
//We check to see if there are dupes in the root nodes because root streams may not have been uniqued.
String key = collection+"."+value;
if(!roots.containsKey(key)) {
Node node = new Node(value, trackTraversal);
if (metrics != null) {
List<Metric> _metrics = new ArrayList();
for (Metric metric : metrics) {
_metrics.add(metric.newInstance());
}
node.setMetrics(_metrics);
}
roots.put(key, node);
} else {
continue;
}
}
joinBatch.add(value);
if (joinBatch.size() == 400) {
JoinRunner joinRunner = new JoinRunner(joinBatch);
Future future = threadPool.submit(joinRunner);
futures.add(future);
joinBatch = new ArrayList();
}
}
if(traversal.getDepth() == 0) {
traversal.addLevel(roots, collection, traverseFrom);
}
this.traversal.setScatter(scatter);
if(useDefaultTraversal) {
this.trackTraversal = traversal.getTrackTraversal();
} else {
this.traversal.setTrackTraversal(trackTraversal);
}
for (Future<List<Tuple>> future : futures) {
List<Tuple> tuples = future.get();
for (Tuple tuple : tuples) {
String _traverseTo = tuple.getString(traverseTo);
String _gather = tuple.getString(gather);
String key = collection + "." + _gather;
if (!traversal.visited(key, _traverseTo, tuple)) {
Node node = level.get(key);
if (node != null) {
node.add((traversal.getDepth()-1)+"^"+_traverseTo, tuple);
} else {
node = new Node(_gather, trackTraversal);
if (metrics != null) {
List<Metric> _metrics = new ArrayList();
for (Metric metric : metrics) {
_metrics.add(metric.newInstance());
}
node.setMetrics(_metrics);
}
node.add((traversal.getDepth()-1)+"^"+_traverseTo, tuple);
level.put(key, node);
}
}
}
}
traversal.addLevel(level, collection, gather);
out = traversal.iterator();
} catch(Exception e) {
throw new RuntimeException(e);
} finally {
threadPool.shutdown();
}
}
if (out.hasNext()) {
return out.next();
} else {
Map map = new HashMap();
map.put("EOF", true);
Tuple tuple = new Tuple(map);
return tuple;
}
}
public int getCost() {
return 0;
}
@Override
public StreamComparator getStreamSort() {
return null;
}
class NodeStream extends TupleStream {
private List<String> ids;
private Iterator<String> it;
public NodeStream(List<String> ids) {
this.ids = ids;
}
public void open() {this.it = ids.iterator();}
public void close() {}
public StreamComparator getStreamSort() {return null;}
public List<TupleStream> children() {return new ArrayList();}
public void setStreamContext(StreamContext context) {}
public Tuple read() {
HashMap map = new HashMap();
if(it.hasNext()) {
map.put("node",it.next());
return new Tuple(map);
} else {
map.put("EOF", true);
return new Tuple(map);
}
}
}
}

View File

@ -0,0 +1,90 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.client.solrj.io.graph;
import org.apache.solr.client.solrj.io.Tuple;
import org.apache.solr.client.solrj.io.stream.metrics.*;
import java.util.*;
public class Node {
private String id;
private List<Metric> metrics;
private Set<String> ancestors;
public Node(String id, boolean track) {
this.id=id;
if(track) {
ancestors = new HashSet();
}
}
public void setMetrics(List<Metric> metrics) {
this.metrics = metrics;
}
public void add(String ancestor, Tuple tuple) {
if(ancestors != null) {
ancestors.add(ancestor);
}
if(metrics != null) {
for(Metric metric : metrics) {
metric.update(tuple);
}
}
}
public Tuple toTuple(String collection, String field, int level, Traversal traversal) {
Map map = new HashMap();
map.put("node", id);
map.put("collection", collection);
map.put("field", field);
map.put("level", level);
boolean prependCollection = traversal.isMultiCollection();
List<String> cols = traversal.getCollections();
if(ancestors != null) {
List<String> l = new ArrayList();
for(String ancestor : ancestors) {
String[] ancestorParts = ancestor.split("\\^");
if(prependCollection) {
//prepend the collection
int colIndex = Integer.parseInt(ancestorParts[0]);
l.add(cols.get(colIndex)+"/"+ancestorParts[1]);
} else {
// Use only the ancestor id.
l.add(ancestorParts[1]);
}
}
map.put("ancestors", l);
}
if(metrics != null) {
for(Metric metric : metrics) {
map.put(metric.getIdentifier(), metric.getValue());
}
}
return new Tuple(map);
}
}

View File

@ -0,0 +1,96 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.client.solrj.io.graph;
import org.apache.solr.client.solrj.io.Tuple;
import java.util.*;
public class Traversal {
private List<Map<String, Node>> graph = new ArrayList();
private List<String> fields = new ArrayList();
private List<String> collections = new ArrayList();
private Set<Scatter> scatter = new HashSet();
private Set<String> collectionSet = new HashSet();
private boolean trackTraversal;
private int depth;
public void addLevel(Map<String, Node> level, String collection, String field) {
graph.add(level);
collections.add(collection);
collectionSet.add(collection);
fields.add(field);
++depth;
}
public int getDepth() {
return depth;
}
public boolean getTrackTraversal() {
return this.trackTraversal;
}
public boolean visited(String nodeId, String ancestorId, Tuple tuple) {
for(Map<String, Node> level : graph) {
Node node = level.get(nodeId);
if(node != null) {
node.add(depth+"^"+ancestorId, tuple);
return true;
}
}
return false;
}
public boolean isMultiCollection() {
return collectionSet.size() > 1;
}
public List<Map<String, Node>> getGraph() {
return graph;
}
public void setScatter(Set<Scatter> scatter) {
this.scatter = scatter;
}
public Set<Scatter> getScatter() {
return this.scatter;
}
public void setTrackTraversal(boolean trackTraversal) {
this.trackTraversal = trackTraversal;
}
public List<String> getCollections() {
return this.collections;
}
public List<String> getFields() {
return this.fields;
}
public enum Scatter {
BRANCHES,
LEAVES;
}
public Iterator<Tuple> iterator() {
return new TraversalIterator(this, scatter);
}
}

View File

@ -0,0 +1,120 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.client.solrj.io.graph;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.solr.client.solrj.io.Tuple;
import org.apache.solr.client.solrj.io.graph.Traversal.Scatter;
class TraversalIterator implements Iterator {
private List<Map<String,Node>> graph;
private List<String> collections;
private List<String> fields;
private Iterator<Iterator<Node>> graphIterator;
private Iterator<Node> levelIterator;
private Iterator<String> fieldIterator;
private Iterator<String> collectionIterator;
private Iterator<Integer> levelNumIterator;
private String outField;
private String outCollection;
private int outLevel;
private Traversal traversal;
public TraversalIterator(Traversal traversal, Set<Scatter> scatter) {
this.traversal = traversal;
graph = traversal.getGraph();
collections = traversal.getCollections();
fields = traversal.getFields();
List<String> outCollections = new ArrayList();
List<String> outFields = new ArrayList();
List<Integer> levelNums = new ArrayList();
List<Iterator<Node>> levelIterators = new ArrayList();
if(scatter.contains(Scatter.BRANCHES)) {
if(graph.size() > 1) {
for(int i=0; i<graph.size()-1; i++) {
Map<String, Node> graphLevel = graph.get(i);
String collection = collections.get(i);
String field = fields.get(i);
outCollections.add(collection);
outFields.add(field);
levelNums.add(i);
levelIterators.add(graphLevel.values().iterator());
}
}
}
if(scatter.contains(Scatter.LEAVES)) {
int leavesLevel = graph.size() > 1 ? graph.size()-1 : 0 ;
Map<String, Node> graphLevel = graph.get(leavesLevel);
String collection = collections.get(leavesLevel);
String field = fields.get(leavesLevel);
levelNums.add(leavesLevel);
outCollections.add(collection);
outFields.add(field);
levelIterators.add(graphLevel.values().iterator());
}
graphIterator = levelIterators.iterator();
levelIterator = graphIterator.next();
fieldIterator = outFields.iterator();
collectionIterator = outCollections.iterator();
levelNumIterator = levelNums.iterator();
outField = fieldIterator.next();
outCollection = collectionIterator.next();
outLevel = levelNumIterator.next();
}
@Override
public boolean hasNext() {
if(levelIterator.hasNext()) {
return true;
} else {
if(graphIterator.hasNext()) {
levelIterator = graphIterator.next();
outField = fieldIterator.next();
outCollection = collectionIterator.next();
outLevel = levelNumIterator.next();
return hasNext();
} else {
return false;
}
}
}
@Override
public Tuple next() {
if(hasNext()) {
Node node = levelIterator.next();
return node.toTuple(outCollection, outField, outLevel, traversal);
} else {
return null;
}
}
}

View File

@ -0,0 +1,211 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.client.solrj.io.stream;
import java.io.IOException;
import java.util.HashMap;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Random;
import java.util.Iterator;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.io.SolrClientCache;
import org.apache.solr.client.solrj.io.Tuple;
import org.apache.solr.client.solrj.io.comp.StreamComparator;
import org.apache.solr.client.solrj.io.stream.expr.Expressible;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParameter;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParameter;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionValue;
import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
import org.apache.solr.client.solrj.request.QueryRequest;
import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.common.SolrDocument;
import org.apache.solr.common.SolrDocumentList;
import org.apache.solr.common.params.ModifiableSolrParams;
/**
* The RandomStream emits a stream of psuedo random Tuples that match the query parameters. Sample expression syntax:
* random(collection, q="Hello word", rows="50", fl="title, body")
**/
public class RandomStream extends TupleStream implements Expressible {
private String zkHost;
private Map<String, String> props;
private String collection;
protected transient SolrClientCache cache;
protected transient CloudSolrClient cloudSolrClient;
private Iterator<SolrDocument> documentIterator;
public RandomStream(String zkHost,
String collection,
Map<String, String> props) throws IOException {
init(zkHost, collection, props);
}
public RandomStream(StreamExpression expression, StreamFactory factory) throws IOException{
// grab all parameters out
String collectionName = factory.getValueOperand(expression, 0);
List<StreamExpressionNamedParameter> namedParams = factory.getNamedOperands(expression);
StreamExpressionNamedParameter zkHostExpression = factory.getNamedOperand(expression, "zkHost");
// Collection Name
if(null == collectionName){
throw new IOException(String.format(Locale.ROOT,"invalid expression %s - collectionName expected as first operand",expression));
}
// Named parameters - passed directly to solr as solrparams
if(0 == namedParams.size()){
throw new IOException(String.format(Locale.ROOT,"invalid expression %s - at least one named parameter expected. eg. 'q=*:*'",expression));
}
// pull out known named params
Map<String,String> params = new HashMap<String,String>();
for(StreamExpressionNamedParameter namedParam : namedParams){
if(!namedParam.getName().equals("zkHost") && !namedParam.getName().equals("buckets") && !namedParam.getName().equals("bucketSorts") && !namedParam.getName().equals("limit")){
params.put(namedParam.getName(), namedParam.getParameter().toString().trim());
}
}
// zkHost, optional - if not provided then will look into factory list to get
String zkHost = null;
if(null == zkHostExpression){
zkHost = factory.getCollectionZkHost(collectionName);
if(zkHost == null) {
zkHost = factory.getDefaultZkHost();
}
}
else if(zkHostExpression.getParameter() instanceof StreamExpressionValue){
zkHost = ((StreamExpressionValue)zkHostExpression.getParameter()).getValue();
}
if(null == zkHost){
throw new IOException(String.format(Locale.ROOT,"invalid expression %s - zkHost not found for collection '%s'",expression,collectionName));
}
// We've got all the required items
init(zkHost, collectionName, params);
}
private void init(String zkHost, String collection, Map<String, String> props) throws IOException {
this.zkHost = zkHost;
this.props = props;
this.collection = collection;
}
@Override
public StreamExpressionParameter toExpression(StreamFactory factory) throws IOException {
// function name
StreamExpression expression = new StreamExpression(factory.getFunctionName(this.getClass()));
// collection
expression.addParameter(collection);
// parameters
for(Entry<String,String> param : props.entrySet()){
expression.addParameter(new StreamExpressionNamedParameter(param.getKey(), param.getValue()));
}
// zkHost
expression.addParameter(new StreamExpressionNamedParameter("zkHost", zkHost));
return expression;
}
public void setStreamContext(StreamContext context) {
cache = context.getSolrClientCache();
}
public List<TupleStream> children() {
List<TupleStream> l = new ArrayList();
return l;
}
public void open() throws IOException {
if(cache != null) {
cloudSolrClient = cache.getCloudSolrClient(zkHost);
} else {
cloudSolrClient = new CloudSolrClient(zkHost);
}
ModifiableSolrParams params = getParams(this.props);
params.remove("sort"); //Override any sort.
Random rand = new Random();
int seed = rand.nextInt();
String sortField = "random_"+seed;
params.add("sort", sortField+" asc");
QueryRequest request = new QueryRequest(params);
try {
QueryResponse response = request.process(cloudSolrClient, collection);
SolrDocumentList docs = response.getResults();
documentIterator = docs.iterator();
} catch (Exception e) {
throw new IOException(e);
}
}
public void close() throws IOException {
if(cache == null) {
cloudSolrClient.close();
}
}
public Tuple read() throws IOException {
if(documentIterator.hasNext()) {
Map map = new HashMap();
SolrDocument doc = documentIterator.next();
for(String key : doc.keySet()) {
map.put(key, doc.get(key));
}
return new Tuple(map);
} else {
Map fields = new HashMap();
fields.put("EOF", true);
Tuple tuple = new Tuple(fields);
return tuple;
}
}
private ModifiableSolrParams getParams(Map<String, String> props) {
ModifiableSolrParams params = new ModifiableSolrParams();
for(String key : props.keySet()) {
String value = props.get(key);
params.add(key, value);
}
return params;
}
public int getCost() {
return 0;
}
@Override
public StreamComparator getStreamSort() {
return null;
}
}

View File

@ -49,6 +49,10 @@ public class StreamContext implements Serializable{
this.entries.put(key, value); this.entries.put(key, value);
} }
public Map getEntries() {
return this.entries;
}
public void setSolrClientCache(SolrClientCache clientCache) { public void setSolrClientCache(SolrClientCache clientCache) {
this.clientCache = clientCache; this.clientCache = clientCache;
} }

View File

@ -50,6 +50,10 @@ public class CountMetric extends Metric implements Serializable {
} }
public String[] getColumns() {
return new String[0];
}
private void init(String functionName){ private void init(String functionName){
setFunctionName(functionName); setFunctionName(functionName);
setIdentifier(functionName, "(*)"); setIdentifier(functionName, "(*)");

View File

@ -67,6 +67,11 @@ public class MaxMetric extends Metric implements Serializable {
} }
} }
public String[] getColumns() {
String[] cols = {columnName};
return cols;
}
public void update(Tuple tuple) { public void update(Tuple tuple) {
Object o = tuple.get(columnName); Object o = tuple.get(columnName);
if(o instanceof Double) { if(o instanceof Double) {

View File

@ -80,6 +80,11 @@ public class MeanMetric extends Metric implements Serializable {
return new MeanMetric(columnName); return new MeanMetric(columnName);
} }
public String[] getColumns() {
String[] cols = {columnName};
return cols;
}
public double getValue() { public double getValue() {
double dcount = (double)count; double dcount = (double)count;
if(longSum == 0) { if(longSum == 0) {

View File

@ -54,4 +54,6 @@ public abstract class Metric implements Serializable, Expressible {
public abstract double getValue(); public abstract double getValue();
public abstract void update(Tuple tuple); public abstract void update(Tuple tuple);
public abstract Metric newInstance(); public abstract Metric newInstance();
public abstract String[] getColumns();
} }

View File

@ -58,6 +58,11 @@ public class MinMetric extends Metric {
} }
public String[] getColumns() {
String[] cols = {columnName};
return cols;
}
public double getValue() { public double getValue() {
if(longMin == Long.MAX_VALUE) { if(longMin == Long.MAX_VALUE) {
return doubleMin; return doubleMin;

View File

@ -58,6 +58,11 @@ public class SumMetric extends Metric implements Serializable {
setIdentifier(functionName, "(", columnName, ")"); setIdentifier(functionName, "(", columnName, ")");
} }
public String[] getColumns() {
String[] cols = {columnName};
return cols;
}
public void update(Tuple tuple) { public void update(Tuple tuple) {
Object o = tuple.get(columnName); Object o = tuple.get(columnName);
if(o instanceof Double) { if(o instanceof Double) {

View File

@ -51,6 +51,8 @@
<fieldType name="tlong" class="solr.TrieLongField" precisionStep="8" omitNorms="true" positionIncrementGap="0"/> <fieldType name="tlong" class="solr.TrieLongField" precisionStep="8" omitNorms="true" positionIncrementGap="0"/>
<fieldType name="tdouble" class="solr.TrieDoubleField" precisionStep="8" omitNorms="true" positionIncrementGap="0"/> <fieldType name="tdouble" class="solr.TrieDoubleField" precisionStep="8" omitNorms="true" positionIncrementGap="0"/>
<fieldType name="random" class="solr.RandomSortField" indexed="true" />
<!-- numeric field types that manipulate the value into <!-- numeric field types that manipulate the value into
a string value that isn't human readable in it's internal form, a string value that isn't human readable in it's internal form,
but sorts correctly and supports range queries. but sorts correctly and supports range queries.
@ -564,6 +566,9 @@
<dynamicField name="*_mfacet" type="string" indexed="true" stored="false" multiValued="true" /> <dynamicField name="*_mfacet" type="string" indexed="true" stored="false" multiValued="true" />
<dynamicField name="random_*" type="random" />
<!-- make sure custom sims work with dynamic fields --> <!-- make sure custom sims work with dynamic fields -->
<!-- <!--
<dynamicField name="*_sim1" type="sim1" indexed="true" stored="true"/> <dynamicField name="*_sim1" type="sim1" indexed="true" stored="true"/>

View File

@ -20,6 +20,7 @@ package org.apache.solr.client.solrj.io.graph;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
@ -31,8 +32,15 @@ import org.apache.lucene.util.LuceneTestCase;
import org.apache.lucene.util.LuceneTestCase.Slow; import org.apache.lucene.util.LuceneTestCase.Slow;
import org.apache.solr.client.solrj.io.SolrClientCache; import org.apache.solr.client.solrj.io.SolrClientCache;
import org.apache.solr.client.solrj.io.Tuple; import org.apache.solr.client.solrj.io.Tuple;
import org.apache.solr.client.solrj.io.comp.ComparatorOrder;
import org.apache.solr.client.solrj.io.comp.FieldComparator;
import org.apache.solr.client.solrj.io.stream.*; import org.apache.solr.client.solrj.io.stream.*;
import org.apache.solr.client.solrj.io.stream.expr.StreamFactory; import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
import org.apache.solr.client.solrj.io.stream.metrics.CountMetric;
import org.apache.solr.client.solrj.io.stream.metrics.MaxMetric;
import org.apache.solr.client.solrj.io.stream.metrics.MeanMetric;
import org.apache.solr.client.solrj.io.stream.metrics.MinMetric;
import org.apache.solr.client.solrj.io.stream.metrics.SumMetric;
import org.apache.solr.cloud.AbstractFullDistribZkTestBase; import org.apache.solr.cloud.AbstractFullDistribZkTestBase;
import org.apache.solr.cloud.AbstractZkTestCase; import org.apache.solr.cloud.AbstractZkTestCase;
import org.apache.solr.common.SolrInputDocument; import org.apache.solr.common.SolrInputDocument;
@ -117,6 +125,8 @@ public class GraphExpressionTest extends AbstractFullDistribZkTestBase {
commit(); commit();
testShortestPathStream(); testShortestPathStream();
testGatherNodesStream();
testGatherNodesFriendsStream();
} }
private void testShortestPathStream() throws Exception { private void testShortestPathStream() throws Exception {
@ -265,9 +275,399 @@ public class GraphExpressionTest extends AbstractFullDistribZkTestBase {
commit(); commit();
} }
private void testGatherNodesStream() throws Exception {
indexr(id, "0", "basket_s", "basket1", "product_s", "product1", "price_f", "20");
indexr(id, "1", "basket_s", "basket1", "product_s", "product3", "price_f", "30");
indexr(id, "2", "basket_s", "basket1", "product_s", "product5", "price_f", "1");
indexr(id, "3", "basket_s", "basket2", "product_s", "product1", "price_f", "2");
indexr(id, "4", "basket_s", "basket2", "product_s", "product6", "price_f", "5");
indexr(id, "5", "basket_s", "basket2", "product_s", "product7", "price_f", "10");
indexr(id, "6", "basket_s", "basket3", "product_s", "product4", "price_f", "20");
indexr(id, "7", "basket_s", "basket3", "product_s", "product3", "price_f", "10");
indexr(id, "8", "basket_s", "basket3", "product_s", "product1", "price_f", "10");
indexr(id, "9", "basket_s", "basket4", "product_s", "product4", "price_f", "40");
indexr(id, "10", "basket_s", "basket4", "product_s", "product3", "price_f", "10");
indexr(id, "11", "basket_s", "basket4", "product_s", "product1", "price_f", "10");
commit();
List<Tuple> tuples = null;
Set<String> paths = null;
GatherNodesStream stream = null;
StreamContext context = new StreamContext();
SolrClientCache cache = new SolrClientCache();
context.setSolrClientCache(cache);
StreamFactory factory = new StreamFactory()
.withCollectionZkHost("collection1", zkServer.getZkAddress())
.withFunctionName("gatherNodes", GatherNodesStream.class)
.withFunctionName("search", CloudSolrStream.class)
.withFunctionName("count", CountMetric.class)
.withFunctionName("avg", MeanMetric.class)
.withFunctionName("sum", SumMetric.class)
.withFunctionName("min", MinMetric.class)
.withFunctionName("max", MaxMetric.class);
String expr = "gatherNodes(collection1, " +
"walk=\"product1->product_s\"," +
"gather=\"basket_s\")";
stream = (GatherNodesStream)factory.constructStream(expr);
stream.setStreamContext(context);
tuples = getTuples(stream);
Collections.sort(tuples, new FieldComparator("node", ComparatorOrder.ASCENDING));
assertTrue(tuples.size() == 4);
assertTrue(tuples.get(0).getString("node").equals("basket1"));
assertTrue(tuples.get(1).getString("node").equals("basket2"));
assertTrue(tuples.get(2).getString("node").equals("basket3"));
assertTrue(tuples.get(3).getString("node").equals("basket4"));
String expr2 = "gatherNodes(collection1, " +
expr+","+
"walk=\"node->basket_s\"," +
"gather=\"product_s\", count(*), avg(price_f), sum(price_f), min(price_f), max(price_f))";
stream = (GatherNodesStream)factory.constructStream(expr2);
context = new StreamContext();
context.setSolrClientCache(cache);
stream.setStreamContext(context);
tuples = getTuples(stream);
Collections.sort(tuples, new FieldComparator("node", ComparatorOrder.ASCENDING));
assertTrue(tuples.size() == 5);
assertTrue(tuples.get(0).getString("node").equals("product3"));
assertTrue(tuples.get(0).getDouble("count(*)").equals(3.0D));
assertTrue(tuples.get(1).getString("node").equals("product4"));
assertTrue(tuples.get(1).getDouble("count(*)").equals(2.0D));
assertTrue(tuples.get(1).getDouble("avg(price_f)").equals(30.0D));
assertTrue(tuples.get(1).getDouble("sum(price_f)").equals(60.0D));
assertTrue(tuples.get(1).getDouble("min(price_f)").equals(20.0D));
assertTrue(tuples.get(1).getDouble("max(price_f)").equals(40.0D));
assertTrue(tuples.get(2).getString("node").equals("product5"));
assertTrue(tuples.get(2).getDouble("count(*)").equals(1.0D));
assertTrue(tuples.get(3).getString("node").equals("product6"));
assertTrue(tuples.get(3).getDouble("count(*)").equals(1.0D));
assertTrue(tuples.get(4).getString("node").equals("product7"));
assertTrue(tuples.get(4).getDouble("count(*)").equals(1.0D));
//Test list of root nodes
expr = "gatherNodes(collection1, " +
"walk=\"product4, product7->product_s\"," +
"gather=\"basket_s\")";
stream = (GatherNodesStream)factory.constructStream(expr);
context = new StreamContext();
context.setSolrClientCache(cache);
stream.setStreamContext(context);
tuples = getTuples(stream);
Collections.sort(tuples, new FieldComparator("node", ComparatorOrder.ASCENDING));
assertTrue(tuples.size() == 3);
assertTrue(tuples.get(0).getString("node").equals("basket2"));
assertTrue(tuples.get(1).getString("node").equals("basket3"));
assertTrue(tuples.get(2).getString("node").equals("basket4"));
//Test with negative filter query
expr = "gatherNodes(collection1, " +
"walk=\"product4, product7->product_s\"," +
"gather=\"basket_s\", fq=\"-basket_s:basket4\")";
stream = (GatherNodesStream)factory.constructStream(expr);
context = new StreamContext();
context.setSolrClientCache(cache);
stream.setStreamContext(context);
tuples = getTuples(stream);
Collections.sort(tuples, new FieldComparator("node", ComparatorOrder.ASCENDING));
assertTrue(tuples.size() == 2);
assertTrue(tuples.get(0).getString("node").equals("basket2"));
assertTrue(tuples.get(1).getString("node").equals("basket3"));
cache.close();
del("*:*");
commit();
}
private void testGatherNodesFriendsStream() throws Exception {
indexr(id, "0", "from_s", "bill", "to_s", "jim", "message_t", "Hello jim");
indexr(id, "1", "from_s", "bill", "to_s", "sam", "message_t", "Hello sam");
indexr(id, "2", "from_s", "bill", "to_s", "max", "message_t", "Hello max");
indexr(id, "3", "from_s", "max", "to_s", "kip", "message_t", "Hello kip");
indexr(id, "4", "from_s", "sam", "to_s", "steve", "message_t", "Hello steve");
indexr(id, "5", "from_s", "jim", "to_s", "ann", "message_t", "Hello steve");
commit();
List<Tuple> tuples = null;
Set<String> paths = null;
GatherNodesStream stream = null;
StreamContext context = new StreamContext();
SolrClientCache cache = new SolrClientCache();
context.setSolrClientCache(cache);
StreamFactory factory = new StreamFactory()
.withCollectionZkHost("collection1", zkServer.getZkAddress())
.withFunctionName("gatherNodes", GatherNodesStream.class)
.withFunctionName("search", CloudSolrStream.class)
.withFunctionName("count", CountMetric.class)
.withFunctionName("hashJoin", HashJoinStream.class)
.withFunctionName("avg", MeanMetric.class)
.withFunctionName("sum", SumMetric.class)
.withFunctionName("min", MinMetric.class)
.withFunctionName("max", MaxMetric.class);
String expr = "gatherNodes(collection1, " +
"walk=\"bill->from_s\"," +
"gather=\"to_s\")";
stream = (GatherNodesStream)factory.constructStream(expr);
stream.setStreamContext(context);
tuples = getTuples(stream);
Collections.sort(tuples, new FieldComparator("node", ComparatorOrder.ASCENDING));
assertTrue(tuples.size() == 3);
assertTrue(tuples.get(0).getString("node").equals("jim"));
assertTrue(tuples.get(1).getString("node").equals("max"));
assertTrue(tuples.get(2).getString("node").equals("sam"));
//Test scatter branches, leaves and trackTraversal
expr = "gatherNodes(collection1, " +
"walk=\"bill->from_s\"," +
"gather=\"to_s\","+
"scatter=\"branches, leaves\", trackTraversal=\"true\")";
stream = (GatherNodesStream)factory.constructStream(expr);
context = new StreamContext();
context.setSolrClientCache(cache);
stream.setStreamContext(context);
tuples = getTuples(stream);
Collections.sort(tuples, new FieldComparator("node", ComparatorOrder.ASCENDING));
assertTrue(tuples.size() == 4);
assertTrue(tuples.get(0).getString("node").equals("bill"));
assertTrue(tuples.get(0).getLong("level").equals(new Long(0)));
assertTrue(tuples.get(0).getStrings("ancestors").size() == 0);
assertTrue(tuples.get(1).getString("node").equals("jim"));
assertTrue(tuples.get(1).getLong("level").equals(new Long(1)));
List<String> ancestors = tuples.get(1).getStrings("ancestors");
System.out.println("##################### Ancestors:"+ancestors);
assert(ancestors.size() == 1);
assert(ancestors.get(0).equals("bill"));
assertTrue(tuples.get(2).getString("node").equals("max"));
assertTrue(tuples.get(2).getLong("level").equals(new Long(1)));
ancestors = tuples.get(2).getStrings("ancestors");
assert(ancestors.size() == 1);
assert(ancestors.get(0).equals("bill"));
assertTrue(tuples.get(3).getString("node").equals("sam"));
assertTrue(tuples.get(3).getLong("level").equals(new Long(1)));
ancestors = tuples.get(3).getStrings("ancestors");
assert(ancestors.size() == 1);
assert(ancestors.get(0).equals("bill"));
// Test query root
expr = "gatherNodes(collection1, " +
"search(collection1, q=\"message_t:jim\", fl=\"from_s\", sort=\"from_s asc\"),"+
"walk=\"from_s->from_s\"," +
"gather=\"to_s\")";
stream = (GatherNodesStream)factory.constructStream(expr);
context = new StreamContext();
context.setSolrClientCache(cache);
stream.setStreamContext(context);
tuples = getTuples(stream);
Collections.sort(tuples, new FieldComparator("node", ComparatorOrder.ASCENDING));
assertTrue(tuples.size() == 3);
assertTrue(tuples.get(0).getString("node").equals("jim"));
assertTrue(tuples.get(1).getString("node").equals("max"));
assertTrue(tuples.get(2).getString("node").equals("sam"));
// Test query root scatter branches
expr = "gatherNodes(collection1, " +
"search(collection1, q=\"message_t:jim\", fl=\"from_s\", sort=\"from_s asc\"),"+
"walk=\"from_s->from_s\"," +
"gather=\"to_s\", scatter=\"branches, leaves\")";
stream = (GatherNodesStream)factory.constructStream(expr);
context = new StreamContext();
context.setSolrClientCache(cache);
stream.setStreamContext(context);
tuples = getTuples(stream);
Collections.sort(tuples, new FieldComparator("node", ComparatorOrder.ASCENDING));
assertTrue(tuples.size() == 4);
assertTrue(tuples.get(0).getString("node").equals("bill"));
assertTrue(tuples.get(0).getLong("level").equals(new Long(0)));
assertTrue(tuples.get(1).getString("node").equals("jim"));
assertTrue(tuples.get(1).getLong("level").equals(new Long(1)));
assertTrue(tuples.get(2).getString("node").equals("max"));
assertTrue(tuples.get(2).getLong("level").equals(new Long(1)));
assertTrue(tuples.get(3).getString("node").equals("sam"));
assertTrue(tuples.get(3).getLong("level").equals(new Long(1)));
expr = "gatherNodes(collection1, " +
"search(collection1, q=\"message_t:jim\", fl=\"from_s\", sort=\"from_s asc\"),"+
"walk=\"from_s->from_s\"," +
"gather=\"to_s\")";
String expr2 = "gatherNodes(collection1, " +
expr+","+
"walk=\"node->from_s\"," +
"gather=\"to_s\")";
stream = (GatherNodesStream)factory.constructStream(expr2);
context = new StreamContext();
context.setSolrClientCache(cache);
stream.setStreamContext(context);
tuples = getTuples(stream);
Collections.sort(tuples, new FieldComparator("node", ComparatorOrder.ASCENDING));
assertTrue(tuples.size() == 3);
assertTrue(tuples.get(0).getString("node").equals("ann"));
assertTrue(tuples.get(1).getString("node").equals("kip"));
assertTrue(tuples.get(2).getString("node").equals("steve"));
//Test two traversals in the same expression
String expr3 = "hashJoin("+expr2+", hashed="+expr2+", on=\"node\")";
HashJoinStream hstream = (HashJoinStream)factory.constructStream(expr3);
context = new StreamContext();
context.setSolrClientCache(cache);
hstream.setStreamContext(context);
tuples = getTuples(hstream);
Collections.sort(tuples, new FieldComparator("node", ComparatorOrder.ASCENDING));
assertTrue(tuples.size() == 3);
assertTrue(tuples.get(0).getString("node").equals("ann"));
assertTrue(tuples.get(1).getString("node").equals("kip"));
assertTrue(tuples.get(2).getString("node").equals("steve"));
//=================================
expr = "gatherNodes(collection1, " +
"search(collection1, q=\"message_t:jim\", fl=\"from_s\", sort=\"from_s asc\"),"+
"walk=\"from_s->from_s\"," +
"gather=\"to_s\")";
expr2 = "gatherNodes(collection1, " +
expr+","+
"walk=\"node->from_s\"," +
"gather=\"to_s\", scatter=\"branches, leaves\")";
stream = (GatherNodesStream)factory.constructStream(expr2);
context = new StreamContext();
context.setSolrClientCache(cache);
stream.setStreamContext(context);
tuples = getTuples(stream);
Collections.sort(tuples, new FieldComparator("node", ComparatorOrder.ASCENDING));
assertTrue(tuples.size() == 7);
assertTrue(tuples.get(0).getString("node").equals("ann"));
assertTrue(tuples.get(0).getLong("level").equals(new Long(2)));
assertTrue(tuples.get(1).getString("node").equals("bill"));
assertTrue(tuples.get(1).getLong("level").equals(new Long(0)));
assertTrue(tuples.get(2).getString("node").equals("jim"));
assertTrue(tuples.get(2).getLong("level").equals(new Long(1)));
assertTrue(tuples.get(3).getString("node").equals("kip"));
assertTrue(tuples.get(3).getLong("level").equals(new Long(2)));
assertTrue(tuples.get(4).getString("node").equals("max"));
assertTrue(tuples.get(4).getLong("level").equals(new Long(1)));
assertTrue(tuples.get(5).getString("node").equals("sam"));
assertTrue(tuples.get(5).getLong("level").equals(new Long(1)));
assertTrue(tuples.get(6).getString("node").equals("steve"));
assertTrue(tuples.get(6).getLong("level").equals(new Long(2)));
//Add a cycle from jim to bill
indexr(id, "6", "from_s", "jim", "to_s", "bill", "message_t", "Hello steve");
indexr(id, "7", "from_s", "sam", "to_s", "bill", "message_t", "Hello steve");
commit();
expr = "gatherNodes(collection1, " +
"search(collection1, q=\"message_t:jim\", fl=\"from_s\", sort=\"from_s asc\"),"+
"walk=\"from_s->from_s\"," +
"gather=\"to_s\", trackTraversal=\"true\")";
expr2 = "gatherNodes(collection1, " +
expr+","+
"walk=\"node->from_s\"," +
"gather=\"to_s\", scatter=\"branches, leaves\", trackTraversal=\"true\")";
stream = (GatherNodesStream)factory.constructStream(expr2);
context = new StreamContext();
context.setSolrClientCache(cache);
stream.setStreamContext(context);
tuples = getTuples(stream);
Collections.sort(tuples, new FieldComparator("node", ComparatorOrder.ASCENDING));
assertTrue(tuples.size() == 7);
assertTrue(tuples.get(0).getString("node").equals("ann"));
assertTrue(tuples.get(0).getLong("level").equals(new Long(2)));
//Bill should now have one ancestor
assertTrue(tuples.get(1).getString("node").equals("bill"));
assertTrue(tuples.get(1).getLong("level").equals(new Long(0)));
assertTrue(tuples.get(1).getStrings("ancestors").size() == 2);
List<String> anc = tuples.get(1).getStrings("ancestors");
Collections.sort(anc);
assertTrue(anc.get(0).equals("jim"));
assertTrue(anc.get(1).equals("sam"));
assertTrue(tuples.get(2).getString("node").equals("jim"));
assertTrue(tuples.get(2).getLong("level").equals(new Long(1)));
assertTrue(tuples.get(3).getString("node").equals("kip"));
assertTrue(tuples.get(3).getLong("level").equals(new Long(2)));
assertTrue(tuples.get(4).getString("node").equals("max"));
assertTrue(tuples.get(4).getLong("level").equals(new Long(1)));
assertTrue(tuples.get(5).getString("node").equals("sam"));
assertTrue(tuples.get(5).getLong("level").equals(new Long(1)));
assertTrue(tuples.get(6).getString("node").equals("steve"));
assertTrue(tuples.get(6).getLong("level").equals(new Long(2)));
cache.close();
del("*:*");
commit();
}
protected List<Tuple> getTuples(TupleStream tupleStream) throws IOException { protected List<Tuple> getTuples(TupleStream tupleStream) throws IOException {
tupleStream.open(); tupleStream.open();
List<Tuple> tuples = new ArrayList<Tuple>(); List<Tuple> tuples = new ArrayList();
for(Tuple t = tupleStream.read(); !t.EOF; t = tupleStream.read()) { for(Tuple t = tupleStream.read(); !t.EOF; t = tupleStream.read()) {
tuples.add(t); tuples.add(t);
} }

View File

@ -23,6 +23,7 @@ import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Locale; import java.util.Locale;
import java.util.Map; import java.util.Map;
import java.util.Collections;
import org.apache.lucene.util.LuceneTestCase; import org.apache.lucene.util.LuceneTestCase;
import org.apache.lucene.util.LuceneTestCase.Slow; import org.apache.lucene.util.LuceneTestCase.Slow;
@ -140,6 +141,7 @@ public class StreamExpressionTest extends AbstractFullDistribZkTestBase {
testNulls(); testNulls();
testTopicStream(); testTopicStream();
testDaemonStream(); testDaemonStream();
testRandomStream();
testParallelUniqueStream(); testParallelUniqueStream();
testParallelReducerStream(); testParallelReducerStream();
testParallelRankStream(); testParallelRankStream();
@ -558,6 +560,76 @@ public class StreamExpressionTest extends AbstractFullDistribZkTestBase {
commit(); commit();
} }
private void testRandomStream() throws Exception {
indexr(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "0");
indexr(id, "2", "a_s", "hello2", "a_i", "2", "a_f", "0");
indexr(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3");
indexr(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4");
indexr(id, "1", "a_s", "hello1", "a_i", "1", "a_f", "1");
commit();
StreamExpression expression;
TupleStream stream;
StreamFactory factory = new StreamFactory()
.withCollectionZkHost("collection1", zkServer.getZkAddress())
.withFunctionName("random", RandomStream.class);
StreamContext context = new StreamContext();
SolrClientCache cache = new SolrClientCache();
try {
context.setSolrClientCache(cache);
expression = StreamExpressionParser.parse("random(collection1, q=\"*:*\", rows=\"10\", fl=\"id, a_i\")");
stream = factory.constructStream(expression);
stream.setStreamContext(context);
List<Tuple> tuples1 = getTuples(stream);
assert (tuples1.size() == 5);
expression = StreamExpressionParser.parse("random(collection1, q=\"*:*\", rows=\"10\", fl=\"id, a_i\")");
stream = factory.constructStream(expression);
stream.setStreamContext(context);
List<Tuple> tuples2 = getTuples(stream);
assert (tuples2.size() == 5);
boolean different = false;
for (int i = 0; i < tuples1.size(); i++) {
Tuple tuple1 = tuples1.get(i);
Tuple tuple2 = tuples2.get(i);
if (!tuple1.get("id").equals(tuple2.get(id))) {
different = true;
break;
}
}
assertTrue(different);
Collections.sort(tuples1, new FieldComparator("id", ComparatorOrder.ASCENDING));
Collections.sort(tuples2, new FieldComparator("id", ComparatorOrder.ASCENDING));
for (int i = 0; i < tuples1.size(); i++) {
Tuple tuple1 = tuples1.get(i);
Tuple tuple2 = tuples2.get(i);
if (!tuple1.get("id").equals(tuple2.get(id))) {
assert(tuple1.getLong("id").equals(tuple2.get("a_i")));
}
}
expression = StreamExpressionParser.parse("random(collection1, q=\"*:*\", rows=\"1\", fl=\"id, a_i\")");
stream = factory.constructStream(expression);
stream.setStreamContext(context);
List<Tuple> tuples3 = getTuples(stream);
assert (tuples3.size() == 1);
} finally {
cache.close();
del("*:*");
commit();
}
}
private void testReducerStream() throws Exception{ private void testReducerStream() throws Exception{
indexr(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "1"); indexr(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "1");
indexr(id, "2", "a_s", "hello0", "a_i", "2", "a_f", "2"); indexr(id, "2", "a_s", "hello0", "a_i", "2", "a_f", "2");