diff --git a/lucene/CHANGES.txt b/lucene/CHANGES.txt index d00972b4d0e..a0814d0e055 100644 --- a/lucene/CHANGES.txt +++ b/lucene/CHANGES.txt @@ -61,7 +61,7 @@ Optimizations multiple polygons and holes, with memory usage independent of polygon complexity. (Karl Wright, Mike McCandless, Robert Muir) -* LUCENE-7159, LUCENE-7222: Speed up LatLonPoint polygon performance for complex +* LUCENE-7159, LUCENE-7222, LUCENE-7229: Speed up LatLonPoint polygon performance for complex polygons. (Robert Muir) * LUCENE-7211: Reduce memory & GC for spatial RPT Intersects when the number of diff --git a/lucene/core/src/java/org/apache/lucene/geo/Polygon.java b/lucene/core/src/java/org/apache/lucene/geo/Polygon.java index 75788dd2543..361c199fc31 100644 --- a/lucene/core/src/java/org/apache/lucene/geo/Polygon.java +++ b/lucene/core/src/java/org/apache/lucene/geo/Polygon.java @@ -48,9 +48,6 @@ public final class Polygon { /** maximum longitude of this polygon's bounding box area */ public final double maxLon; - // TODO: refactor to GeoUtils once LUCENE-7165 is complete - private static final double ENCODING_TOLERANCE = 1e-6; - // TODO: we could also compute the maximal inner bounding box, to make relations faster to compute? /** @@ -234,70 +231,79 @@ public final class Polygon { return containsCount; } - private boolean crossesSlowly(double minLat, double maxLat, final double minLon, final double maxLon) { - /* - * Accurately compute (within restrictions of cartesian decimal degrees) whether a rectangle crosses a polygon - */ - final double[] boxLats = new double[] { minLat, minLat, maxLat, maxLat, minLat }; - final double[] boxLons = new double[] { minLon, maxLon, maxLon, minLon, minLon }; + /** Returns true if the box crosses our polygon */ + private boolean crossesSlowly(double minLat, double maxLat, double minLon, double maxLon) { + // we compute line intersections of every polygon edge with every box line. + // if we find one, return true. + // for each box line (AB): + // for each poly line (CD): + // intersects = orient(C,D,A) * orient(C,D,B) <= 0 && orient(A,B,C) * orient(A,B,D) <= 0 + for (int i = 1; i < polyLons.length; i++) { + double cy = polyLats[i - 1]; + double dy = polyLats[i]; + double cx = polyLons[i - 1]; + double dx = polyLons[i]; - // computes the intersection point between each bbox edge and the polygon edge - for (int b=0; b<4; ++b) { - double a1 = boxLats[b+1]-boxLats[b]; - double b1 = boxLons[b]-boxLons[b+1]; - double c1 = a1*boxLons[b+1] + b1*boxLats[b+1]; - for (int p=0; p s) { - continue; // out of range - } - double x01 = Math.max(boxLons[b], boxLons[b+1]) + ENCODING_TOLERANCE; - if (x01 < s) { - continue; // out of range - } - double x10 = Math.min(polyLons[p], polyLons[p+1]) - ENCODING_TOLERANCE; - if (x10 > s) { - continue; // out of range - } - double x11 = Math.max(polyLons[p], polyLons[p+1]) + ENCODING_TOLERANCE; - if (x11 < s) { - continue; // out of range - } + // optimization: see if the rectangle is outside of the "bounding box" of the polyline at all + // if not, don't waste our time trying more complicated stuff + if ((cy < minLat && dy < minLat) || + (cy > maxLat && dy > maxLat) || + (cx < minLon && dx < minLon) || + (cx > maxLon && dx > maxLon)) { + continue; + } - double t = (1/d)*(a1*c2 - a2*c1); - double y00 = Math.min(boxLats[b], boxLats[b+1]) - ENCODING_TOLERANCE; - if (y00 > t || (x00 == s && y00 == t)) { - continue; // out of range or touching - } - double y01 = Math.max(boxLats[b], boxLats[b+1]) + ENCODING_TOLERANCE; - if (y01 < t || (x01 == s && y01 == t)) { - continue; // out of range or touching - } - double y10 = Math.min(polyLats[p], polyLats[p+1]) - ENCODING_TOLERANCE; - if (y10 > t || (x10 == s && y10 == t)) { - continue; // out of range or touching - } - double y11 = Math.max(polyLats[p], polyLats[p+1]) + ENCODING_TOLERANCE; - if (y11 < t || (x11 == s && y11 == t)) { - continue; // out of range or touching - } - // if line segments are not touching and the intersection point is within the range of either segment - return true; - } - } // for each poly edge - } // for each bbox edge + // does box's top edge intersect polyline? + // ax = minLon, bx = maxLon, ay = maxLat, by = maxLat + if (orient(cx, cy, dx, dy, minLon, maxLat) * orient(cx, cy, dx, dy, maxLon, maxLat) <= 0 && + orient(minLon, maxLat, maxLon, maxLat, cx, cy) * orient(minLon, maxLat, maxLon, maxLat, dx, dy) <= 0) { + return true; + } + + // does box's right edge intersect polyline? + // ax = maxLon, bx = maxLon, ay = maxLat, by = minLat + if (orient(cx, cy, dx, dy, maxLon, maxLat) * orient(cx, cy, dx, dy, maxLon, minLat) <= 0 && + orient(maxLon, maxLat, maxLon, minLat, cx, cy) * orient(maxLon, maxLat, maxLon, minLat, dx, dy) <= 0) { + return true; + } + + // does box's bottom edge intersect polyline? + // ax = maxLon, bx = minLon, ay = minLat, by = minLat + if (orient(cx, cy, dx, dy, maxLon, minLat) * orient(cx, cy, dx, dy, minLon, minLat) <= 0 && + orient(maxLon, minLat, minLon, minLat, cx, cy) * orient(maxLon, minLat, minLon, minLat, dx, dy) <= 0) { + return true; + } + + // does box's left edge intersect polyline? + // ax = minLon, bx = minLon, ay = minLat, by = maxLat + if (orient(cx, cy, dx, dy, minLon, minLat) * orient(cx, cy, dx, dy, minLon, maxLat) <= 0 && + orient(minLon, minLat, minLon, maxLat, cx, cy) * orient(minLon, minLat, minLon, maxLat, dx, dy) <= 0) { + return true; + } + } return false; } + /** + * Returns a positive value if points a, b, and c are arranged in counter-clockwise order, + * negative value if clockwise, zero if collinear. + */ + // see the "Orient2D" method described here: + // http://www.cs.berkeley.edu/~jrs/meshpapers/robnotes.pdf + // https://www.cs.cmu.edu/~quake/robust.html + // Note that this one does not yet have the floating point tricks to be exact! + private static int orient(double ax, double ay, double bx, double by, double cx, double cy) { + double v1 = (bx - ax) * (cy - ay); + double v2 = (cx - ax) * (by - ay); + if (v1 > v2) { + return 1; + } else if (v1 < v2) { + return -1; + } else { + return 0; + } + } + /** Returns a copy of the internal latitude array */ public double[] getPolyLats() { return polyLats.clone(); diff --git a/lucene/core/src/java/org/apache/lucene/util/SloppyMath.java b/lucene/core/src/java/org/apache/lucene/util/SloppyMath.java index 7be9be1c4ad..0bfca5e6f5e 100644 --- a/lucene/core/src/java/org/apache/lucene/util/SloppyMath.java +++ b/lucene/core/src/java/org/apache/lucene/util/SloppyMath.java @@ -37,7 +37,7 @@ public class SloppyMath { * specified in decimal degrees (latitude/longitude). This works correctly * even if the dateline is between the two points. *

- * Error is at most 2E-1 (20cm) from the actual haversine distance, but is typically + * Error is at most 4E-1 (40cm) from the actual haversine distance, but is typically * much smaller for reasonable distances: around 1E-5 (0.01mm) for distances less than * 1000km. * diff --git a/lucene/core/src/test/org/apache/lucene/util/TestSloppyMath.java b/lucene/core/src/test/org/apache/lucene/util/TestSloppyMath.java index 6a2eb86b2b6..488f00e1a4b 100644 --- a/lucene/core/src/test/org/apache/lucene/util/TestSloppyMath.java +++ b/lucene/core/src/test/org/apache/lucene/util/TestSloppyMath.java @@ -33,7 +33,7 @@ public class TestSloppyMath extends LuceneTestCase { // accuracy for asin() static double ASIN_DELTA = 1E-7; // accuracy for haversinMeters() - static double HAVERSIN_DELTA = 2E-1; + static double HAVERSIN_DELTA = 38E-2; // accuracy for haversinMeters() for "reasonable" distances (< 1000km) static double REASONABLE_HAVERSIN_DELTA = 1E-5; @@ -161,11 +161,28 @@ public class TestSloppyMath extends LuceneTestCase { double lat2 = GeoTestUtil.nextLatitude(); double lon2 = GeoTestUtil.nextLongitude(); - double expected = haversinMeters(lat1, lon1, lat2, lon2); - double actual = slowHaversin(lat1, lon1, lat2, lon2); + double expected = slowHaversin(lat1, lon1, lat2, lon2); + double actual = haversinMeters(lat1, lon1, lat2, lon2); assertEquals(expected, actual, HAVERSIN_DELTA); } } + + /** + * Step across the whole world to find huge absolute errors. + * Don't rely on random number generator to pick these massive distances. */ + public void testAcrossWholeWorldSteps() { + for (int lat1 = -90; lat1 <= 90; lat1 += 10) { + for (int lon1 = -180; lon1 <= 180; lon1 += 10) { + for (int lat2 = -90; lat2 <= 90; lat2 += 10) { + for (int lon2 = -180; lon2 <= 180; lon2 += 10) { + double expected = slowHaversin(lat1, lon1, lat2, lon2); + double actual = haversinMeters(lat1, lon1, lat2, lon2); + assertEquals(expected, actual, HAVERSIN_DELTA); + } + } + } + } + } public void testAgainstSlowVersionReasonable() { for (int i = 0; i < 100_000; i++) { diff --git a/lucene/sandbox/src/java/org/apache/lucene/document/LatLonGrid.java b/lucene/sandbox/src/java/org/apache/lucene/document/LatLonGrid.java index e5718e24458..db627298992 100644 --- a/lucene/sandbox/src/java/org/apache/lucene/document/LatLonGrid.java +++ b/lucene/sandbox/src/java/org/apache/lucene/document/LatLonGrid.java @@ -48,7 +48,7 @@ import static org.apache.lucene.geo.GeoEncodingUtils.decodeLongitude; // relational operations as rectangle <-> rectangle relations in integer space in log(n) time.. final class LatLonGrid { // must be a power of two! - static final int GRID_SIZE = 1<<5; + static final int GRID_SIZE = 1<<7; final int minLat; final int maxLat; final int minLon; diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt index 0a784c48358..c16e757630c 100644 --- a/solr/CHANGES.txt +++ b/solr/CHANGES.txt @@ -129,6 +129,9 @@ Bug Fixes * SOLR-9004: Fix "name" field type definition in films example. (Alexandre Rafalovitch via Varun Thacker) +* SOLR-8983: Cleanup clusterstate and replicas for a failed create collection request + (Varun Thacker, Anshum Gupta) + Optimizations ---------------------- * SOLR-8722: Don't force a full ZkStateReader refresh on every Overseer operation. diff --git a/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java b/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java index 503ff291653..6bff6481fd4 100644 --- a/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java +++ b/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java @@ -1961,10 +1961,16 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler } processResponses(results, shardHandler, false, null, async, requestMap, Collections.emptySet()); - - log.debug("Finished create command on all shards for collection: " - + collectionName); - + if(results.get("failure") != null && ((SimpleOrderedMap)results.get("failure")).size() > 0) { + // Let's cleanup as we hit an exception + // We shouldn't be passing 'results' here for the cleanup as the response would then contain 'success' + // element, which may be interpreted by the user as a positive ack + cleanupCollection(collectionName, new NamedList()); + log.info("Cleaned up artifacts for failed create collection for [" + collectionName + "]"); + } else { + log.debug("Finished create command on all shards for collection: " + + collectionName); + } } catch (SolrException ex) { throw ex; } catch (Exception ex) { @@ -1972,6 +1978,15 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler } } + + private void cleanupCollection(String collectionName, NamedList results) throws KeeperException, InterruptedException { + log.error("Cleaning up collection [" + collectionName + "]." ); + Map props = makeMap( + Overseer.QUEUE_OPERATION, DELETE.toLower(), + NAME, collectionName); + deleteCollection(new ZkNodeProps(props), results); + } + private Map identifyNodes(ClusterState clusterState, List nodeList, ZkNodeProps message, diff --git a/solr/core/src/java/org/apache/solr/handler/StreamHandler.java b/solr/core/src/java/org/apache/solr/handler/StreamHandler.java index 5ddd3129313..7c47c768338 100644 --- a/solr/core/src/java/org/apache/solr/handler/StreamHandler.java +++ b/solr/core/src/java/org/apache/solr/handler/StreamHandler.java @@ -28,6 +28,7 @@ import java.util.Map.Entry; import org.apache.solr.client.solrj.io.SolrClientCache; import org.apache.solr.client.solrj.io.Tuple; import org.apache.solr.client.solrj.io.comp.StreamComparator; +import org.apache.solr.client.solrj.io.graph.GatherNodesStream; import org.apache.solr.client.solrj.io.graph.ShortestPathStream; import org.apache.solr.client.solrj.io.ops.ConcatOperation; import org.apache.solr.client.solrj.io.ops.DistinctOperation; @@ -117,11 +118,10 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware, .withFunctionName("outerHashJoin", OuterHashJoinStream.class) .withFunctionName("intersect", IntersectStream.class) .withFunctionName("complement", ComplementStream.class) - .withFunctionName("daemon", DaemonStream.class) - .withFunctionName("sort", SortStream.class) - - // graph streams - .withFunctionName("shortestPath", ShortestPathStream.class) + .withFunctionName("sort", SortStream.class) + .withFunctionName("daemon", DaemonStream.class) + .withFunctionName("shortestPath", ShortestPathStream.class) + .withFunctionName("gatherNodes", GatherNodesStream.class) // metrics .withFunctionName("min", MinMetric.class) @@ -274,6 +274,14 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware, public Tuple read() { String msg = e.getMessage(); + + Throwable t = e.getCause(); + while(t != null) { + msg = t.getMessage(); + t = t.getCause(); + } + + Map m = new HashMap(); m.put("EOF", true); m.put("EXCEPTION", msg); diff --git a/solr/core/src/java/org/apache/solr/update/processor/FieldMutatingUpdateProcessorFactory.java b/solr/core/src/java/org/apache/solr/update/processor/FieldMutatingUpdateProcessorFactory.java index a5c4969a603..26fe2d7e4ee 100644 --- a/solr/core/src/java/org/apache/solr/update/processor/FieldMutatingUpdateProcessorFactory.java +++ b/solr/core/src/java/org/apache/solr/update/processor/FieldMutatingUpdateProcessorFactory.java @@ -77,7 +77,7 @@ import org.apache.solr.util.plugin.SolrCoreAware; * In the ExampleFieldMutatingUpdateProcessorFactory configured below, * fields will be mutated if the name starts with "foo" or "bar"; * unless the field name contains the substring "SKIP" or - * the fieldType is (or subclasses) DateField. Meaning a field named + * the fieldType is (or subclasses) TrieDateField. Meaning a field named * "foo_SKIP" is guaranteed not to be selected, but a field named "bar_smith" * that uses StrField will be selected. *

diff --git a/solr/core/src/test/org/apache/solr/cloud/CollectionsAPIDistributedZkTest.java b/solr/core/src/test/org/apache/solr/cloud/CollectionsAPIDistributedZkTest.java index ad6df12c5bd..accc36a6ed5 100644 --- a/solr/core/src/test/org/apache/solr/cloud/CollectionsAPIDistributedZkTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/CollectionsAPIDistributedZkTest.java @@ -26,8 +26,19 @@ import java.lang.management.ManagementFactory; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; -import java.util.*; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; import java.util.Map.Entry; +import java.util.Objects; +import java.util.Properties; +import java.util.Set; import java.util.concurrent.TimeUnit; import org.apache.lucene.util.LuceneTestCase.Slow; @@ -39,7 +50,6 @@ import org.apache.solr.client.solrj.SolrServerException; import org.apache.solr.client.solrj.embedded.JettySolrRunner; import org.apache.solr.client.solrj.impl.CloudSolrClient; import org.apache.solr.client.solrj.impl.HttpSolrClient; -import org.apache.solr.client.solrj.impl.HttpSolrClient.RemoteSolrException; import org.apache.solr.client.solrj.request.CollectionAdminRequest; import org.apache.solr.client.solrj.request.CoreAdminRequest; import org.apache.solr.client.solrj.request.CoreAdminRequest.Create; @@ -375,6 +385,14 @@ public class CollectionsAPIDistributedZkTest extends AbstractFullDistribZkTestBa } + private NamedList makeRequest(String baseUrl, SolrRequest request, int socketTimeout) + throws SolrServerException, IOException { + try (SolrClient client = createNewSolrClient("", baseUrl)) { + ((HttpSolrClient) client).setSoTimeout(socketTimeout); + return client.request(request); + } + } + private NamedList makeRequest(String baseUrl, SolrRequest request) throws SolrServerException, IOException { try (SolrClient client = createNewSolrClient("", baseUrl)) { @@ -525,8 +543,7 @@ public class CollectionsAPIDistributedZkTest extends AbstractFullDistribZkTestBa params.set(OverseerCollectionMessageHandler.CREATE_NODE_SET, nn1 + "," + nn2); request = new QueryRequest(params); request.setPath("/admin/collections"); - gotExp = false; - NamedList resp = makeRequest(baseUrl, request);; + NamedList resp = makeRequest(baseUrl, request, 60000); SimpleOrderedMap success = (SimpleOrderedMap) resp.get("success"); SimpleOrderedMap failure = (SimpleOrderedMap) resp.get("failure"); diff --git a/solr/core/src/test/org/apache/solr/cloud/CreateCollectionCleanupTest.java b/solr/core/src/test/org/apache/solr/cloud/CreateCollectionCleanupTest.java new file mode 100644 index 00000000000..2d1a7f8d888 --- /dev/null +++ b/solr/core/src/test/org/apache/solr/cloud/CreateCollectionCleanupTest.java @@ -0,0 +1,84 @@ +package org.apache.solr.cloud; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.util.ArrayList; +import java.util.Properties; + +import org.apache.solr.client.solrj.impl.CloudSolrClient; +import org.apache.solr.client.solrj.request.CollectionAdminRequest; +import org.apache.solr.client.solrj.response.CollectionAdminResponse; +import org.apache.solr.common.params.CoreAdminParams; +import org.junit.BeforeClass; +import org.junit.Test; + +public class CreateCollectionCleanupTest extends SolrCloudTestCase { + + protected static final String CLOUD_SOLR_XML_WITH_10S_CREATE_COLL_WAIT = "\n" + + "\n" + + " ${shareSchema:false}\n" + + " ${configSetBaseDir:configsets}\n" + + " ${coreRootDirectory:.}\n" + + "\n" + + " \n" + + " ${urlScheme:}\n" + + " ${socketTimeout:90000}\n" + + " ${connTimeout:15000}\n" + + " \n" + + "\n" + + " \n" + + " 127.0.0.1\n" + + " ${hostPort:8983}\n" + + " ${hostContext:solr}\n" + + " ${solr.zkclienttimeout:30000}\n" + + " ${genericCoreNodeNames:true}\n" + + " 10000\n" + + " ${distribUpdateConnTimeout:45000}\n" + + " ${distribUpdateSoTimeout:340000}\n" + + " ${createCollectionWaitTimeTillActive:10}\n" + + " \n" + + " \n" + + "\n"; + + + @BeforeClass + public static void createCluster() throws Exception { + configureCluster(1) + .addConfig("conf1", TEST_PATH().resolve("configsets").resolve("cloud-minimal").resolve("conf")) + .withSolrXml(CLOUD_SOLR_XML_WITH_10S_CREATE_COLL_WAIT) + .configure(); + } + + @Test + public void testCreateCollectionCleanup() throws Exception { + final CloudSolrClient cloudClient = cluster.getSolrClient(); + // Create a collection that would fail + CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection("foo","conf1",1,1); + + Properties properties = new Properties(); + properties.put(CoreAdminParams.DATA_DIR, "/some_invalid_dir/foo"); + create.setProperties(properties); + CollectionAdminResponse rsp = create.process(cloudClient); + assertFalse(rsp.isSuccess()); + + // Confirm using LIST that the collection does not exist + CollectionAdminRequest.List list = CollectionAdminRequest.listCollections(); + rsp = list.process(cloudClient); + assertFalse(((ArrayList) rsp.getResponse().get("collections")).contains("foo")); + } +} diff --git a/solr/core/src/test/org/apache/solr/cloud/TestStressLiveNodes.java b/solr/core/src/test/org/apache/solr/cloud/TestStressLiveNodes.java index 28dcc821988..3bf6b7066dc 100644 --- a/solr/core/src/test/org/apache/solr/cloud/TestStressLiveNodes.java +++ b/solr/core/src/test/org/apache/solr/cloud/TestStressLiveNodes.java @@ -20,7 +20,6 @@ import java.lang.invoke.MethodHandles; import java.util.ArrayList; import java.util.Collections; import java.util.List; -import java.util.Random; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; @@ -28,17 +27,15 @@ import java.util.concurrent.TimeUnit; import org.apache.lucene.util.LuceneTestCase.Slow; import org.apache.lucene.util.TestUtil; import org.apache.solr.cloud.SolrCloudTestCase; -import org.apache.solr.client.solrj.embedded.JettySolrRunner; import org.apache.solr.client.solrj.impl.CloudSolrClient; import org.apache.solr.common.cloud.SolrZkClient; import org.apache.solr.common.cloud.ZkStateReader; import org.apache.solr.common.util.ExecutorUtil; -import org.apache.solr.core.CloudConfig.CloudConfigBuilder; import org.apache.solr.util.DefaultSolrThreadFactory; import org.apache.zookeeper.CreateMode; -import org.apache.zookeeper.KeeperException; +import org.junit.AfterClass; import org.junit.BeforeClass; import org.slf4j.Logger; @@ -80,6 +77,12 @@ public class TestStressLiveNodes extends SolrCloudTestCase { ZK_SERVER_ADDR = cluster.getZkServer().getZkAddress(); } + + @AfterClass + private static void afterClass() throws Exception { + CLOUD_CLIENT.close(); + CLOUD_CLIENT = null; + } private static SolrZkClient newSolrZkClient() { assertNotNull(ZK_SERVER_ADDR); diff --git a/solr/core/src/test/org/apache/solr/core/BlobRepositoryCloudTest.java b/solr/core/src/test/org/apache/solr/core/BlobRepositoryCloudTest.java index 3e51b36e1bd..2ef7083429f 100644 --- a/solr/core/src/test/org/apache/solr/core/BlobRepositoryCloudTest.java +++ b/solr/core/src/test/org/apache/solr/core/BlobRepositoryCloudTest.java @@ -2,15 +2,12 @@ package org.apache.solr.core; import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; import java.nio.file.Path; import java.util.HashMap; import java.util.Set; -import org.apache.http.client.methods.HttpPost; -import org.apache.http.entity.ContentType; -import org.apache.http.entity.StringEntity; -import org.apache.http.impl.client.CloseableHttpClient; -import org.apache.http.impl.client.HttpClients; import org.apache.solr.client.solrj.SolrQuery; import org.apache.solr.client.solrj.SolrServerException; import org.apache.solr.client.solrj.impl.CloudSolrClient; @@ -19,6 +16,8 @@ import org.apache.solr.cloud.SolrCloudTestCase; import org.apache.solr.cloud.ZkTestServer; import org.apache.solr.common.SolrDocumentList; import org.apache.solr.common.SolrInputDocument; +import org.apache.solr.common.cloud.ZkStateReader; +import org.apache.solr.handler.TestBlobHandler; import org.apache.zookeeper.server.DataNode; import org.apache.zookeeper.server.DataTree; import org.apache.zookeeper.server.ZKDatabase; @@ -54,10 +53,9 @@ public class BlobRepositoryCloudTest extends SolrCloudTestCase { // Thread.sleep(2000); HashMap params = new HashMap<>(); cluster.createCollection(".system", 1, 1, null, params); -// Thread.sleep(2000); // test component will fail if it cant' find a blob with this data by this name - postBlob("testResource", "foo,bar\nbaz,bam"); -// Thread.sleep(2000); + TestBlobHandler.postData(cluster.getSolrClient(), findLiveNodeURI(), "testResource", ByteBuffer.wrap("foo,bar\nbaz,bam".getBytes(StandardCharsets.UTF_8))); + // Thread.sleep(2000); // if these don't load we probably failed to post the blob above cluster.createCollection("col1", 1, 1, "configname", params); cluster.createCollection("col2", 1, 1, "configname", params); @@ -95,28 +93,10 @@ public class BlobRepositoryCloudTest extends SolrCloudTestCase { assertLastQueryToCollection("col2"); } - // TODO: move this up to parent class? Probably accepting entity, or with alternative signatures - private static void postBlob(String name, String string) throws IOException { - HttpPost post = new HttpPost(findLiveNodeURI() + "/.system/blob/" + name); - StringEntity csv = new StringEntity(string, ContentType.create("application/octet-stream")); - post.setEntity(csv); - try (CloseableHttpClient httpclient = HttpClients.createDefault()) { - httpclient.execute(post); - } - } - // TODO: move this up to parent class? private static String findLiveNodeURI() { - ZkTestServer zkServer = cluster.getZkServer(); - ZKDatabase zkDatabase = zkServer.getZKDatabase(); - DataTree dataTree = zkDatabase.getDataTree(); - DataNode node = dataTree.getNode("/solr/live_nodes"); - Set children = node.getChildren(); - String liveNode = children.iterator().next(); - String[] split = liveNode.split("_"); - String host = split[0]; - String name = split[1]; - return "http://" + host + "/" + name; + ZkStateReader zkStateReader = cluster.getSolrClient().getZkStateReader(); + return zkStateReader.getBaseUrlForNodeName(zkStateReader.getClusterState().getCollection(".system").getSlices().iterator().next().getLeader().getNodeName()); } private void assertLastQueryToCollection(String collection) throws SolrServerException, IOException { diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/graph/GatherNodesStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/graph/GatherNodesStream.java new file mode 100644 index 00000000000..759aa0f8431 --- /dev/null +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/graph/GatherNodesStream.java @@ -0,0 +1,580 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.solr.client.solrj.io.graph; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Set; +import java.util.HashSet; +import java.util.ArrayList; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; + +import org.apache.solr.client.solrj.io.eq.MultipleFieldEqualitor; +import org.apache.solr.client.solrj.io.stream.*; +import org.apache.solr.client.solrj.io.stream.metrics.*; +import org.apache.solr.client.solrj.io.Tuple; +import org.apache.solr.client.solrj.io.comp.StreamComparator; +import org.apache.solr.client.solrj.io.eq.FieldEqualitor; +import org.apache.solr.client.solrj.io.stream.expr.Expressible; +import org.apache.solr.client.solrj.io.stream.expr.StreamExpression; +import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParameter; +import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParameter; +import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionValue; +import org.apache.solr.client.solrj.io.stream.expr.StreamFactory; +import org.apache.solr.common.util.ExecutorUtil; +import org.apache.solr.common.util.SolrjNamedThreadFactory; + +public class GatherNodesStream extends TupleStream implements Expressible { + + private String zkHost; + private String collection; + private StreamContext streamContext; + private Map queryParams; + private String traverseFrom; + private String traverseTo; + private String gather; + private boolean trackTraversal; + private boolean useDefaultTraversal; + + private TupleStream tupleStream; + private Set scatter; + private Iterator out; + private Traversal traversal; + private List metrics; + + public GatherNodesStream(String zkHost, + String collection, + TupleStream tupleStream, + String traverseFrom, + String traverseTo, + String gather, + Map queryParams, + List metrics, + boolean trackTraversal, + Set scatter) { + + init(zkHost, + collection, + tupleStream, + traverseFrom, + traverseTo, + gather, + queryParams, + metrics, + trackTraversal, + scatter); + } + + public GatherNodesStream(StreamExpression expression, StreamFactory factory) throws IOException { + + + String collectionName = factory.getValueOperand(expression, 0); + List namedParams = factory.getNamedOperands(expression); + StreamExpressionNamedParameter zkHostExpression = factory.getNamedOperand(expression, "zkHost"); + + List streamExpressions = factory.getExpressionOperandsRepresentingTypes(expression, Expressible.class, TupleStream.class); + // Collection Name + if(null == collectionName) { + throw new IOException(String.format(Locale.ROOT,"invalid expression %s - collectionName expected as first operand",expression)); + } + + + Set scatter = new HashSet(); + + StreamExpressionNamedParameter scatterExpression = factory.getNamedOperand(expression, "scatter"); + + if(scatterExpression == null) { + scatter.add(Traversal.Scatter.LEAVES); + } else { + String s = ((StreamExpressionValue)scatterExpression.getParameter()).getValue(); + String[] sArray = s.split(","); + for(String sv : sArray) { + sv = sv.trim(); + if(Traversal.Scatter.BRANCHES.toString().equalsIgnoreCase(sv)) { + scatter.add(Traversal.Scatter.BRANCHES); + } else if (Traversal.Scatter.LEAVES.toString().equalsIgnoreCase(sv)) { + scatter.add(Traversal.Scatter.LEAVES); + } + } + } + + String gather = null; + StreamExpressionNamedParameter gatherExpression = factory.getNamedOperand(expression, "gather"); + + if(gatherExpression == null) { + throw new IOException(String.format(Locale.ROOT,"invalid expression %s - from param is required",expression)); + } else { + gather = ((StreamExpressionValue)gatherExpression.getParameter()).getValue(); + } + + String traverseFrom = null; + String traverseTo = null; + StreamExpressionNamedParameter edgeExpression = factory.getNamedOperand(expression, "walk"); + + TupleStream stream = null; + + if(edgeExpression == null) { + throw new IOException(String.format(Locale.ROOT,"invalid expression %s - walk param is required", expression)); + } else { + if(streamExpressions.size() > 0) { + stream = factory.constructStream(streamExpressions.get(0)); + String edge = ((StreamExpressionValue) edgeExpression.getParameter()).getValue(); + String[] fields = edge.split("->"); + if (fields.length != 2) { + throw new IOException(String.format(Locale.ROOT, "invalid expression %s - walk param separated by an -> and must contain two fields", expression)); + } + traverseFrom = fields[0].trim(); + traverseTo = fields[1].trim(); + } else { + String edge = ((StreamExpressionValue) edgeExpression.getParameter()).getValue(); + String[] fields = edge.split("->"); + if (fields.length != 2) { + throw new IOException(String.format(Locale.ROOT, "invalid expression %s - walk param separated by an -> and must contain two fields", expression)); + } + + String[] rootNodes = fields[0].split(","); + List l = new ArrayList(); + for(String n : rootNodes) { + l.add(n.trim()); + } + + stream = new NodeStream(l); + traverseFrom = "node"; + traverseTo = fields[1].trim(); + } + } + + List metricExpressions = factory.getExpressionOperandsRepresentingTypes(expression, Expressible.class, Metric.class); + List metrics = new ArrayList(); + for(int idx = 0; idx < metricExpressions.size(); ++idx){ + metrics.add(factory.constructMetric(metricExpressions.get(idx))); + } + + boolean trackTraversal = false; + + StreamExpressionNamedParameter trackExpression = factory.getNamedOperand(expression, "trackTraversal"); + + if(trackExpression != null) { + trackTraversal = Boolean.parseBoolean(((StreamExpressionValue) trackExpression.getParameter()).getValue()); + } else { + useDefaultTraversal = true; + } + + StreamExpressionNamedParameter scopeExpression = factory.getNamedOperand(expression, "localScope"); + + if(trackExpression != null) { + trackTraversal = Boolean.parseBoolean(((StreamExpressionValue) trackExpression.getParameter()).getValue()); + } + + Map params = new HashMap(); + for(StreamExpressionNamedParameter namedParam : namedParams){ + if(!namedParam.getName().equals("zkHost") && + !namedParam.getName().equals("gather") && + !namedParam.getName().equals("walk") && + !namedParam.getName().equals("scatter") && + !namedParam.getName().equals("trackTraversal")) + { + params.put(namedParam.getName(), namedParam.getParameter().toString().trim()); + } + } + + // zkHost, optional - if not provided then will look into factory list to get + String zkHost = null; + if(null == zkHostExpression){ + zkHost = factory.getCollectionZkHost(collectionName); + if(zkHost == null) { + zkHost = factory.getDefaultZkHost(); + } + } else if(zkHostExpression.getParameter() instanceof StreamExpressionValue) { + zkHost = ((StreamExpressionValue)zkHostExpression.getParameter()).getValue(); + } + + if(null == zkHost){ + throw new IOException(String.format(Locale.ROOT,"invalid expression %s - zkHost not found for collection '%s'",expression,collectionName)); + } + + // We've got all the required items + init(zkHost, + collectionName, + stream, + traverseFrom, + traverseTo , + gather, + params, + metrics, + trackTraversal, + scatter); + } + + private void init(String zkHost, + String collection, + TupleStream tupleStream, + String traverseFrom, + String traverseTo, + String gather, + Map queryParams, + List metrics, + boolean trackTraversal, + Set scatter) { + this.zkHost = zkHost; + this.collection = collection; + this.tupleStream = tupleStream; + this.traverseFrom = traverseFrom; + this.traverseTo = traverseTo; + this.gather = gather; + this.queryParams = queryParams; + this.metrics = metrics; + this.trackTraversal = trackTraversal; + this.scatter = scatter; + } + + @Override + public StreamExpressionParameter toExpression(StreamFactory factory) throws IOException { + + StreamExpression expression = new StreamExpression(factory.getFunctionName(this.getClass())); + + // collection + expression.addParameter(collection); + + if(tupleStream instanceof Expressible){ + expression.addParameter(((Expressible)tupleStream).toExpression(factory)); + } + else{ + throw new IOException("This GatherNodesStream contains a non-expressible TupleStream - it cannot be converted to an expression"); + } + + Set entries = queryParams.entrySet(); + // parameters + for(Map.Entry param : entries){ + String value = param.getValue().toString(); + + // SOLR-8409: This is a special case where the params contain a " character + // Do note that in any other BASE streams with parameters where a " might come into play + // that this same replacement needs to take place. + value = value.replace("\"", "\\\""); + + expression.addParameter(new StreamExpressionNamedParameter(param.getKey().toString(), value)); + } + + if(metrics != null) { + for (Metric metric : metrics) { + expression.addParameter(metric.toExpression(factory)); + } + } + + expression.addParameter(new StreamExpressionNamedParameter("zkHost", zkHost)); + expression.addParameter(new StreamExpressionNamedParameter("gather", zkHost)); + expression.addParameter(new StreamExpressionNamedParameter("walk", traverseFrom+"->"+traverseTo)); + expression.addParameter(new StreamExpressionNamedParameter("trackTraversal", Boolean.toString(trackTraversal))); + + StringBuilder buf = new StringBuilder(); + for(Traversal.Scatter sc : scatter) { + if(buf.length() > 0 ) { + buf.append(","); + } + buf.append(sc.toString()); + } + + expression.addParameter(new StreamExpressionNamedParameter("scatter", buf.toString())); + + return expression; + } + + public void setStreamContext(StreamContext context) { + this.traversal = (Traversal) context.get("traversal"); + if (traversal == null) { + //No traversal in the context. So create a new context and a new traversal. + //This ensures that two separate traversals in the same expression don't pollute each others traversal. + StreamContext localContext = new StreamContext(); + + localContext.numWorkers = context.numWorkers; + localContext.workerID = context.workerID; + localContext.setSolrClientCache(context.getSolrClientCache()); + localContext.setStreamFactory(context.getStreamFactory()); + + for(Object key :context.getEntries().keySet()) { + localContext.put(key, context.get(key)); + } + + traversal = new Traversal(); + + localContext.put("traversal", traversal); + + this.tupleStream.setStreamContext(localContext); + this.streamContext = localContext; + } else { + this.tupleStream.setStreamContext(context); + this.streamContext = context; + } + } + + public List children() { + List l = new ArrayList(); + l.add(tupleStream); + return l; + } + + public void open() throws IOException { + tupleStream.open(); + } + + private class JoinRunner implements Callable> { + + private List nodes; + private List edges = new ArrayList(); + + public JoinRunner(List nodes) { + this.nodes = nodes; + } + + public List call() { + + Map joinParams = new HashMap(); + Set flSet = new HashSet(); + flSet.add(gather); + flSet.add(traverseTo); + + //Add the metric columns + + if(metrics != null) { + for(Metric metric : metrics) { + for(String column : metric.getColumns()) { + flSet.add(column); + } + } + } + + if(queryParams.containsKey("fl")) { + String flString = (String)queryParams.get("fl"); + String[] flArray = flString.split(","); + for(String f : flArray) { + flSet.add(f.trim()); + } + } + + Iterator it = flSet.iterator(); + StringBuilder buf = new StringBuilder(); + while(it.hasNext()) { + buf.append(it.next()); + if(it.hasNext()) { + buf.append(","); + } + } + + joinParams.putAll(queryParams); + joinParams.put("fl", buf.toString()); + joinParams.put("qt", "/export"); + joinParams.put("sort", gather + " asc,"+traverseTo +" asc"); + + StringBuffer nodeQuery = new StringBuffer(); + + for(String node : nodes) { + nodeQuery.append(node).append(" "); + } + + String q = traverseTo + ":(" + nodeQuery.toString().trim() + ")"; + + + joinParams.put("q", q); + TupleStream stream = null; + try { + stream = new UniqueStream(new CloudSolrStream(zkHost, collection, joinParams), new MultipleFieldEqualitor(new FieldEqualitor(gather), new FieldEqualitor(traverseTo))); + stream.setStreamContext(streamContext); + stream.open(); + BATCH: + while (true) { + Tuple tuple = stream.read(); + if (tuple.EOF) { + break BATCH; + } + + edges.add(tuple); + } + } catch (Exception e) { + throw new RuntimeException(e); + } finally { + try { + stream.close(); + } catch(Exception ce) { + throw new RuntimeException(ce); + } + } + return edges; + } + } + + + public void close() throws IOException { + tupleStream.close(); + } + + public Tuple read() throws IOException { + + if (out == null) { + List joinBatch = new ArrayList(); + List>> futures = new ArrayList(); + Map level = new HashMap(); + + ExecutorService threadPool = null; + try { + threadPool = ExecutorUtil.newMDCAwareFixedThreadPool(4, new SolrjNamedThreadFactory("GatherNodesStream")); + + Map roots = new HashMap(); + + while (true) { + Tuple tuple = tupleStream.read(); + if (tuple.EOF) { + if (joinBatch.size() > 0) { + JoinRunner joinRunner = new JoinRunner(joinBatch); + Future future = threadPool.submit(joinRunner); + futures.add(future); + } + break; + } + + String value = tuple.getString(traverseFrom); + + if(traversal.getDepth() == 0) { + //This gathers the root nodes + //We check to see if there are dupes in the root nodes because root streams may not have been uniqued. + String key = collection+"."+value; + if(!roots.containsKey(key)) { + Node node = new Node(value, trackTraversal); + if (metrics != null) { + List _metrics = new ArrayList(); + for (Metric metric : metrics) { + _metrics.add(metric.newInstance()); + } + node.setMetrics(_metrics); + } + + roots.put(key, node); + } else { + continue; + } + } + + joinBatch.add(value); + if (joinBatch.size() == 400) { + JoinRunner joinRunner = new JoinRunner(joinBatch); + Future future = threadPool.submit(joinRunner); + futures.add(future); + joinBatch = new ArrayList(); + } + } + + if(traversal.getDepth() == 0) { + traversal.addLevel(roots, collection, traverseFrom); + } + + this.traversal.setScatter(scatter); + + if(useDefaultTraversal) { + this.trackTraversal = traversal.getTrackTraversal(); + } else { + this.traversal.setTrackTraversal(trackTraversal); + } + + for (Future> future : futures) { + List tuples = future.get(); + for (Tuple tuple : tuples) { + String _traverseTo = tuple.getString(traverseTo); + String _gather = tuple.getString(gather); + String key = collection + "." + _gather; + if (!traversal.visited(key, _traverseTo, tuple)) { + Node node = level.get(key); + if (node != null) { + node.add((traversal.getDepth()-1)+"^"+_traverseTo, tuple); + } else { + node = new Node(_gather, trackTraversal); + if (metrics != null) { + List _metrics = new ArrayList(); + for (Metric metric : metrics) { + _metrics.add(metric.newInstance()); + } + node.setMetrics(_metrics); + } + node.add((traversal.getDepth()-1)+"^"+_traverseTo, tuple); + level.put(key, node); + } + } + } + } + + traversal.addLevel(level, collection, gather); + out = traversal.iterator(); + } catch(Exception e) { + throw new RuntimeException(e); + } finally { + threadPool.shutdown(); + } + } + + if (out.hasNext()) { + return out.next(); + } else { + Map map = new HashMap(); + map.put("EOF", true); + Tuple tuple = new Tuple(map); + return tuple; + } + } + + public int getCost() { + return 0; + } + + @Override + public StreamComparator getStreamSort() { + return null; + } + + class NodeStream extends TupleStream { + + private List ids; + private Iterator it; + + public NodeStream(List ids) { + this.ids = ids; + } + + public void open() {this.it = ids.iterator();} + public void close() {} + public StreamComparator getStreamSort() {return null;} + public List children() {return new ArrayList();} + public void setStreamContext(StreamContext context) {} + + public Tuple read() { + HashMap map = new HashMap(); + if(it.hasNext()) { + map.put("node",it.next()); + return new Tuple(map); + } else { + + map.put("EOF", true); + return new Tuple(map); + } + } + } +} \ No newline at end of file diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/graph/Node.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/graph/Node.java new file mode 100644 index 00000000000..befa5a7721c --- /dev/null +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/graph/Node.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.solr.client.solrj.io.graph; + +import org.apache.solr.client.solrj.io.Tuple; +import org.apache.solr.client.solrj.io.stream.metrics.*; +import java.util.*; + +public class Node { + + private String id; + private List metrics; + private Set ancestors; + + public Node(String id, boolean track) { + this.id=id; + if(track) { + ancestors = new HashSet(); + } + } + + public void setMetrics(List metrics) { + this.metrics = metrics; + } + + public void add(String ancestor, Tuple tuple) { + if(ancestors != null) { + ancestors.add(ancestor); + } + + if(metrics != null) { + for(Metric metric : metrics) { + metric.update(tuple); + } + } + } + + public Tuple toTuple(String collection, String field, int level, Traversal traversal) { + Map map = new HashMap(); + + map.put("node", id); + map.put("collection", collection); + map.put("field", field); + map.put("level", level); + + boolean prependCollection = traversal.isMultiCollection(); + List cols = traversal.getCollections(); + + if(ancestors != null) { + List l = new ArrayList(); + for(String ancestor : ancestors) { + String[] ancestorParts = ancestor.split("\\^"); + + if(prependCollection) { + //prepend the collection + int colIndex = Integer.parseInt(ancestorParts[0]); + l.add(cols.get(colIndex)+"/"+ancestorParts[1]); + } else { + // Use only the ancestor id. + l.add(ancestorParts[1]); + } + } + + map.put("ancestors", l); + } + + if(metrics != null) { + for(Metric metric : metrics) { + map.put(metric.getIdentifier(), metric.getValue()); + } + } + + return new Tuple(map); + } +} \ No newline at end of file diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/graph/Traversal.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/graph/Traversal.java new file mode 100644 index 00000000000..43d23b33b19 --- /dev/null +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/graph/Traversal.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.solr.client.solrj.io.graph; + +import org.apache.solr.client.solrj.io.Tuple; +import java.util.*; + +public class Traversal { + + private List> graph = new ArrayList(); + private List fields = new ArrayList(); + private List collections = new ArrayList(); + private Set scatter = new HashSet(); + private Set collectionSet = new HashSet(); + private boolean trackTraversal; + private int depth; + + public void addLevel(Map level, String collection, String field) { + graph.add(level); + collections.add(collection); + collectionSet.add(collection); + fields.add(field); + ++depth; + } + + public int getDepth() { + return depth; + } + + public boolean getTrackTraversal() { + return this.trackTraversal; + } + + public boolean visited(String nodeId, String ancestorId, Tuple tuple) { + for(Map level : graph) { + Node node = level.get(nodeId); + if(node != null) { + node.add(depth+"^"+ancestorId, tuple); + return true; + } + } + return false; + } + + public boolean isMultiCollection() { + return collectionSet.size() > 1; + } + + public List> getGraph() { + return graph; + } + + public void setScatter(Set scatter) { + this.scatter = scatter; + } + + public Set getScatter() { + return this.scatter; + } + + public void setTrackTraversal(boolean trackTraversal) { + this.trackTraversal = trackTraversal; + } + + public List getCollections() { + return this.collections; + } + + public List getFields() { + return this.fields; + } + + public enum Scatter { + BRANCHES, + LEAVES; + } + + public Iterator iterator() { + return new TraversalIterator(this, scatter); + } +} \ No newline at end of file diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/graph/TraversalIterator.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/graph/TraversalIterator.java new file mode 100644 index 00000000000..7cfe3756fb7 --- /dev/null +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/graph/TraversalIterator.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.solr.client.solrj.io.graph; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.solr.client.solrj.io.Tuple; +import org.apache.solr.client.solrj.io.graph.Traversal.Scatter; + +class TraversalIterator implements Iterator { + + private List> graph; + private List collections; + private List fields; + + private Iterator> graphIterator; + private Iterator levelIterator; + + private Iterator fieldIterator; + private Iterator collectionIterator; + private Iterator levelNumIterator; + private String outField; + private String outCollection; + private int outLevel; + private Traversal traversal; + + public TraversalIterator(Traversal traversal, Set scatter) { + this.traversal = traversal; + graph = traversal.getGraph(); + collections = traversal.getCollections(); + fields = traversal.getFields(); + + List outCollections = new ArrayList(); + List outFields = new ArrayList(); + List levelNums = new ArrayList(); + List> levelIterators = new ArrayList(); + + if(scatter.contains(Scatter.BRANCHES)) { + if(graph.size() > 1) { + for(int i=0; i graphLevel = graph.get(i); + String collection = collections.get(i); + String field = fields.get(i); + outCollections.add(collection); + outFields.add(field); + levelNums.add(i); + levelIterators.add(graphLevel.values().iterator()); + } + } + } + + if(scatter.contains(Scatter.LEAVES)) { + int leavesLevel = graph.size() > 1 ? graph.size()-1 : 0 ; + Map graphLevel = graph.get(leavesLevel); + String collection = collections.get(leavesLevel); + String field = fields.get(leavesLevel); + levelNums.add(leavesLevel); + outCollections.add(collection); + outFields.add(field); + levelIterators.add(graphLevel.values().iterator()); + } + + graphIterator = levelIterators.iterator(); + levelIterator = graphIterator.next(); + + fieldIterator = outFields.iterator(); + collectionIterator = outCollections.iterator(); + levelNumIterator = levelNums.iterator(); + + outField = fieldIterator.next(); + outCollection = collectionIterator.next(); + outLevel = levelNumIterator.next(); + } + + @Override + public boolean hasNext() { + if(levelIterator.hasNext()) { + return true; + } else { + if(graphIterator.hasNext()) { + levelIterator = graphIterator.next(); + outField = fieldIterator.next(); + outCollection = collectionIterator.next(); + outLevel = levelNumIterator.next(); + return hasNext(); + } else { + return false; + } + } + } + + @Override + public Tuple next() { + if(hasNext()) { + Node node = levelIterator.next(); + return node.toTuple(outCollection, outField, outLevel, traversal); + } else { + return null; + } + } +} \ No newline at end of file diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/RandomStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/RandomStream.java new file mode 100644 index 00000000000..3eb35c18618 --- /dev/null +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/RandomStream.java @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.solr.client.solrj.io.stream; + +import java.io.IOException; +import java.util.HashMap; +import java.util.ArrayList; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Random; +import java.util.Iterator; + +import org.apache.solr.client.solrj.impl.CloudSolrClient; +import org.apache.solr.client.solrj.io.SolrClientCache; +import org.apache.solr.client.solrj.io.Tuple; +import org.apache.solr.client.solrj.io.comp.StreamComparator; +import org.apache.solr.client.solrj.io.stream.expr.Expressible; +import org.apache.solr.client.solrj.io.stream.expr.StreamExpression; +import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParameter; +import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParameter; +import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionValue; +import org.apache.solr.client.solrj.io.stream.expr.StreamFactory; +import org.apache.solr.client.solrj.request.QueryRequest; +import org.apache.solr.client.solrj.response.QueryResponse; +import org.apache.solr.common.SolrDocument; +import org.apache.solr.common.SolrDocumentList; +import org.apache.solr.common.params.ModifiableSolrParams; + +/** + * The RandomStream emits a stream of psuedo random Tuples that match the query parameters. Sample expression syntax: + * random(collection, q="Hello word", rows="50", fl="title, body") + **/ + +public class RandomStream extends TupleStream implements Expressible { + + private String zkHost; + private Map props; + private String collection; + protected transient SolrClientCache cache; + protected transient CloudSolrClient cloudSolrClient; + private Iterator documentIterator; + + public RandomStream(String zkHost, + String collection, + Map props) throws IOException { + init(zkHost, collection, props); + } + + public RandomStream(StreamExpression expression, StreamFactory factory) throws IOException{ + // grab all parameters out + String collectionName = factory.getValueOperand(expression, 0); + List namedParams = factory.getNamedOperands(expression); + StreamExpressionNamedParameter zkHostExpression = factory.getNamedOperand(expression, "zkHost"); + + + // Collection Name + if(null == collectionName){ + throw new IOException(String.format(Locale.ROOT,"invalid expression %s - collectionName expected as first operand",expression)); + } + + // Named parameters - passed directly to solr as solrparams + if(0 == namedParams.size()){ + throw new IOException(String.format(Locale.ROOT,"invalid expression %s - at least one named parameter expected. eg. 'q=*:*'",expression)); + } + + // pull out known named params + Map params = new HashMap(); + for(StreamExpressionNamedParameter namedParam : namedParams){ + if(!namedParam.getName().equals("zkHost") && !namedParam.getName().equals("buckets") && !namedParam.getName().equals("bucketSorts") && !namedParam.getName().equals("limit")){ + params.put(namedParam.getName(), namedParam.getParameter().toString().trim()); + } + } + + // zkHost, optional - if not provided then will look into factory list to get + String zkHost = null; + if(null == zkHostExpression){ + zkHost = factory.getCollectionZkHost(collectionName); + if(zkHost == null) { + zkHost = factory.getDefaultZkHost(); + } + } + else if(zkHostExpression.getParameter() instanceof StreamExpressionValue){ + zkHost = ((StreamExpressionValue)zkHostExpression.getParameter()).getValue(); + } + if(null == zkHost){ + throw new IOException(String.format(Locale.ROOT,"invalid expression %s - zkHost not found for collection '%s'",expression,collectionName)); + } + + // We've got all the required items + init(zkHost, collectionName, params); + } + + private void init(String zkHost, String collection, Map props) throws IOException { + this.zkHost = zkHost; + this.props = props; + this.collection = collection; + } + + @Override + public StreamExpressionParameter toExpression(StreamFactory factory) throws IOException { + // function name + StreamExpression expression = new StreamExpression(factory.getFunctionName(this.getClass())); + + // collection + expression.addParameter(collection); + + // parameters + for(Entry param : props.entrySet()){ + expression.addParameter(new StreamExpressionNamedParameter(param.getKey(), param.getValue())); + } + + // zkHost + expression.addParameter(new StreamExpressionNamedParameter("zkHost", zkHost)); + + return expression; + } + + + public void setStreamContext(StreamContext context) { + cache = context.getSolrClientCache(); + } + + public List children() { + List l = new ArrayList(); + return l; + } + + public void open() throws IOException { + if(cache != null) { + cloudSolrClient = cache.getCloudSolrClient(zkHost); + } else { + cloudSolrClient = new CloudSolrClient(zkHost); + } + + ModifiableSolrParams params = getParams(this.props); + + params.remove("sort"); //Override any sort. + + Random rand = new Random(); + int seed = rand.nextInt(); + + String sortField = "random_"+seed; + params.add("sort", sortField+" asc"); + + QueryRequest request = new QueryRequest(params); + try { + QueryResponse response = request.process(cloudSolrClient, collection); + SolrDocumentList docs = response.getResults(); + documentIterator = docs.iterator(); + } catch (Exception e) { + throw new IOException(e); + } + } + + public void close() throws IOException { + if(cache == null) { + cloudSolrClient.close(); + } + } + + public Tuple read() throws IOException { + if(documentIterator.hasNext()) { + Map map = new HashMap(); + SolrDocument doc = documentIterator.next(); + for(String key : doc.keySet()) { + map.put(key, doc.get(key)); + } + return new Tuple(map); + } else { + Map fields = new HashMap(); + fields.put("EOF", true); + Tuple tuple = new Tuple(fields); + return tuple; + } + } + + private ModifiableSolrParams getParams(Map props) { + ModifiableSolrParams params = new ModifiableSolrParams(); + for(String key : props.keySet()) { + String value = props.get(key); + params.add(key, value); + } + return params; + } + + public int getCost() { + return 0; + } + + @Override + public StreamComparator getStreamSort() { + return null; + } +} \ No newline at end of file diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/StreamContext.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/StreamContext.java index ff0aefa4d9a..87e30356ec3 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/StreamContext.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/StreamContext.java @@ -49,6 +49,10 @@ public class StreamContext implements Serializable{ this.entries.put(key, value); } + public Map getEntries() { + return this.entries; + } + public void setSolrClientCache(SolrClientCache clientCache) { this.clientCache = clientCache; } diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/metrics/CountMetric.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/metrics/CountMetric.java index 0e19177ff05..445b530163d 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/metrics/CountMetric.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/metrics/CountMetric.java @@ -49,6 +49,10 @@ public class CountMetric extends Metric implements Serializable { init(functionName); } + + public String[] getColumns() { + return new String[0]; + } private void init(String functionName){ setFunctionName(functionName); diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/metrics/MaxMetric.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/metrics/MaxMetric.java index 8f2069e472d..0594bf42249 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/metrics/MaxMetric.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/metrics/MaxMetric.java @@ -67,6 +67,11 @@ public class MaxMetric extends Metric implements Serializable { } } + public String[] getColumns() { + String[] cols = {columnName}; + return cols; + } + public void update(Tuple tuple) { Object o = tuple.get(columnName); if(o instanceof Double) { diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/metrics/MeanMetric.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/metrics/MeanMetric.java index 0a5726c95bc..097e04b822b 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/metrics/MeanMetric.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/metrics/MeanMetric.java @@ -80,6 +80,11 @@ public class MeanMetric extends Metric implements Serializable { return new MeanMetric(columnName); } + public String[] getColumns() { + String[] cols = {columnName}; + return cols; + } + public double getValue() { double dcount = (double)count; if(longSum == 0) { diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/metrics/Metric.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/metrics/Metric.java index e7321828dd0..07a400a50e8 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/metrics/Metric.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/metrics/Metric.java @@ -54,4 +54,6 @@ public abstract class Metric implements Serializable, Expressible { public abstract double getValue(); public abstract void update(Tuple tuple); public abstract Metric newInstance(); + public abstract String[] getColumns(); + } \ No newline at end of file diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/metrics/MinMetric.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/metrics/MinMetric.java index 7c6060e9ddf..0a565809080 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/metrics/MinMetric.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/metrics/MinMetric.java @@ -56,7 +56,12 @@ public class MinMetric extends Metric { setFunctionName(functionName); setIdentifier(functionName, "(", columnName, ")"); } - + + + public String[] getColumns() { + String[] cols = {columnName}; + return cols; + } public double getValue() { if(longMin == Long.MAX_VALUE) { diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/metrics/SumMetric.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/metrics/SumMetric.java index 805f9781283..578dae764fb 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/metrics/SumMetric.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/metrics/SumMetric.java @@ -58,6 +58,11 @@ public class SumMetric extends Metric implements Serializable { setIdentifier(functionName, "(", columnName, ")"); } + public String[] getColumns() { + String[] cols = {columnName}; + return cols; + } + public void update(Tuple tuple) { Object o = tuple.get(columnName); if(o instanceof Double) { diff --git a/solr/solrj/src/test-files/solrj/solr/collection1/conf/schema-streaming.xml b/solr/solrj/src/test-files/solrj/solr/collection1/conf/schema-streaming.xml index 7a7ee52a56d..575b6225331 100644 --- a/solr/solrj/src/test-files/solrj/solr/collection1/conf/schema-streaming.xml +++ b/solr/solrj/src/test-files/solrj/solr/collection1/conf/schema-streaming.xml @@ -51,6 +51,8 @@ + +