From d200689b2b89803167596896aebf882f01c4a465 Mon Sep 17 00:00:00 2001 From: Joel Bernstein Date: Wed, 16 Dec 2015 19:44:43 +0000 Subject: [PATCH] SOLR-6398: Add IterativeMergeStrategy to support running Parallel Iterative Algorithms inside of Solr git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1720422 13f79535-47bb-0310-9956-ffa450edef68 --- solr/CHANGES.txt | 3 + .../component/IterativeMergeStrategy.java | 124 ++++++++++++++++++ .../search/AnalyticsMergeStrategyTest.java | 23 ++++ .../search/TestAnalyticsQParserPlugin.java | 67 +++++++++- 4 files changed, 211 insertions(+), 6 deletions(-) create mode 100644 solr/core/src/java/org/apache/solr/handler/component/IterativeMergeStrategy.java diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt index 6ad6059d288..5db8296d008 100644 --- a/solr/CHANGES.txt +++ b/solr/CHANGES.txt @@ -118,6 +118,9 @@ New Features * SOLR-7904: Add StreamExpression Support to FacetStream (Dennis Gove) +* SOLR-6398: Add IterativeMergeStrategy to support running Parallel Iterative Algorithms inside of Solr + (Joel Bernstein) + Bug Fixes ---------------------- * SOLR-8386: Add field option in the new admin UI schema page loads up even when no schemaFactory has been diff --git a/solr/core/src/java/org/apache/solr/handler/component/IterativeMergeStrategy.java b/solr/core/src/java/org/apache/solr/handler/component/IterativeMergeStrategy.java new file mode 100644 index 00000000000..e0de63c4080 --- /dev/null +++ b/solr/core/src/java/org/apache/solr/handler/component/IterativeMergeStrategy.java @@ -0,0 +1,124 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.solr.handler.component; + +import java.util.concurrent.Callable; +import java.util.concurrent.Future; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.List; +import java.util.ArrayList; + +import org.apache.lucene.util.NamedThreadFactory; +import org.apache.solr.client.solrj.SolrRequest; +import org.apache.solr.client.solrj.impl.HttpClientUtil; +import org.apache.solr.client.solrj.impl.HttpSolrClient; +import org.apache.solr.client.solrj.request.QueryRequest; +import org.apache.solr.client.solrj.response.QueryResponse; +import org.apache.solr.common.SolrDocumentList; +import org.apache.solr.common.params.ModifiableSolrParams; +import org.apache.solr.common.util.ExecutorUtil; +import org.apache.solr.common.util.SolrjNamedThreadFactory; +import org.apache.solr.search.SolrIndexSearcher; +import org.apache.http.client.HttpClient; + +public abstract class IterativeMergeStrategy implements MergeStrategy { + + protected ExecutorService executorService; + protected static HttpClient httpClient; + + static { + ModifiableSolrParams params = new ModifiableSolrParams(); + params.set(HttpClientUtil.PROP_MAX_CONNECTIONS, 128); + params.set(HttpClientUtil.PROP_MAX_CONNECTIONS_PER_HOST, 32); + httpClient = HttpClientUtil.createClient(params); + } + + public void merge(ResponseBuilder rb, ShardRequest sreq) { + rb._responseDocs = new SolrDocumentList(); // Null pointers will occur otherwise. + rb.onePassDistributedQuery = true; // Turn off the second pass distributed. + executorService = ExecutorUtil.newMDCAwareCachedThreadPool(new SolrjNamedThreadFactory("IterativeMergeStrategy")); + try { + process(rb, sreq); + } catch (Exception e) { + throw new RuntimeException(e); + } finally { + executorService.shutdownNow(); + } + } + + public boolean mergesIds() { + return true; + } + + public int getCost() { + return 0; + } + + public boolean handlesMergeFields() { + return false; + } + + public void handleMergeFields(ResponseBuilder rb, SolrIndexSearcher searcher) { + + } + + public static class CallBack implements Callable { + private HttpSolrClient solrClient; + private QueryRequest req; + private QueryResponse response; + private ShardResponse originalShardResponse; + + public CallBack(ShardResponse originalShardResponse, QueryRequest req) { + this.solrClient = new HttpSolrClient(originalShardResponse.getShardAddress(), httpClient); + this.req = req; + this.originalShardResponse = originalShardResponse; + req.setMethod(SolrRequest.METHOD.POST); + ModifiableSolrParams params = (ModifiableSolrParams)req.getParams(); + params.add("distrib", "false"); + } + + public QueryResponse getResponse() { + return this.response; + } + + public ShardResponse getOriginalShardResponse() { + return this.originalShardResponse; + } + + public CallBack call() throws Exception{ + this.response = req.process(solrClient); + return this; + } + } + + public List> callBack(List responses, QueryRequest req) { + List> futures = new ArrayList(); + for(ShardResponse response : responses) { + futures.add(this.executorService.submit(new CallBack(response, req))); + } + return futures; + } + + public Future callBack(ShardResponse response, QueryRequest req) { + return this.executorService.submit(new CallBack(response, req)); + } + + protected abstract void process(ResponseBuilder rb, ShardRequest sreq) throws Exception; + +} \ No newline at end of file diff --git a/solr/core/src/test/org/apache/solr/search/AnalyticsMergeStrategyTest.java b/solr/core/src/test/org/apache/solr/search/AnalyticsMergeStrategyTest.java index f62a8df0134..14abd745229 100644 --- a/solr/core/src/test/org/apache/solr/search/AnalyticsMergeStrategyTest.java +++ b/solr/core/src/test/org/apache/solr/search/AnalyticsMergeStrategyTest.java @@ -59,6 +59,12 @@ public class AnalyticsMergeStrategyTest extends BaseDistributedSearchTestCase { commit(); + /* + * The count qparser plugin is pointing to the TestAnalyticsQParserPlugin. This class defines a simple AnalyticsQuery and + * has two merge strategies. If the iterate local param is true then an InterativeMergeStrategy is used. + */ + + ModifiableSolrParams params = new ModifiableSolrParams(); params.add("q", "*:*"); params.add("fq", "{!count}"); @@ -66,6 +72,14 @@ public class AnalyticsMergeStrategyTest extends BaseDistributedSearchTestCase { QueryResponse rsp = queryServer(params); assertCount(rsp, 11); + //Test IterativeMergeStrategy + params = new ModifiableSolrParams(); + params.add("q", "*:*"); + params.add("fq", "{!count iterate=true}"); + setDistributedParams(params); + rsp = queryServer(params); + assertCountOnly(rsp, 44); + params = new ModifiableSolrParams(); params.add("q", "id:(1 2 5 6)"); params.add("fq", "{!count}"); @@ -74,6 +88,15 @@ public class AnalyticsMergeStrategyTest extends BaseDistributedSearchTestCase { assertCount(rsp, 4); } + private void assertCountOnly(QueryResponse rsp, int count) throws Exception { + NamedList response = rsp.getResponse(); + NamedList analytics = (NamedList)response.get("analytics"); + Integer c = (Integer)analytics.get("mycount"); + if(c.intValue() != count) { + throw new Exception("Count is not correct:"+count+":"+c.intValue()); + } + } + private void assertCount(QueryResponse rsp, int count) throws Exception { NamedList response = rsp.getResponse(); NamedList analytics = (NamedList)response.get("analytics"); diff --git a/solr/core/src/test/org/apache/solr/search/TestAnalyticsQParserPlugin.java b/solr/core/src/test/org/apache/solr/search/TestAnalyticsQParserPlugin.java index b086523c5b7..c2ea050e627 100644 --- a/solr/core/src/test/org/apache/solr/search/TestAnalyticsQParserPlugin.java +++ b/solr/core/src/test/org/apache/solr/search/TestAnalyticsQParserPlugin.java @@ -17,11 +17,15 @@ package org.apache.solr.search; -import org.apache.lucene.search.LeafCollector; import org.apache.lucene.search.Query; import org.apache.lucene.search.IndexSearcher; +import org.apache.solr.client.solrj.request.QueryRequest; + +import org.apache.solr.client.solrj.response.QueryResponse; +import org.apache.solr.common.params.ModifiableSolrParams; import org.apache.solr.common.params.SolrParams; import org.apache.solr.common.util.NamedList; +import org.apache.solr.handler.component.IterativeMergeStrategy; import org.apache.solr.handler.component.ResponseBuilder; import org.apache.solr.handler.component.ShardRequest; import org.apache.solr.request.SolrQueryRequest; @@ -29,6 +33,8 @@ import org.apache.solr.handler.component.MergeStrategy; import org.apache.solr.handler.component.ShardResponse; import org.junit.Ignore; +import java.util.List; +import java.util.concurrent.Future; import java.io.IOException; @Ignore @@ -50,26 +56,36 @@ public class TestAnalyticsQParserPlugin extends QParserPlugin { } public Query parse() throws SyntaxError { - return new TestAnalyticsQuery(new TestAnalyticsMergeStrategy()); + int base = localParams.getInt("base", 0); + boolean iterate = localParams.getBool("iterate", false); + if(iterate) + return new TestAnalyticsQuery(base, new TestIterative()); + else + return new TestAnalyticsQuery(base, new TestAnalyticsMergeStrategy()); } } class TestAnalyticsQuery extends AnalyticsQuery { - public TestAnalyticsQuery(MergeStrategy mergeStrategy) { + private int base; + + public TestAnalyticsQuery(int base, MergeStrategy mergeStrategy) { super(mergeStrategy); + this.base = base; } public DelegatingCollector getAnalyticsCollector(ResponseBuilder rb, IndexSearcher searcher) { - return new TestAnalyticsCollector(rb); + return new TestAnalyticsCollector(base, rb); } } class TestAnalyticsCollector extends DelegatingCollector { ResponseBuilder rb; int count; + int base; - public TestAnalyticsCollector(ResponseBuilder rb) { + public TestAnalyticsCollector(int base, ResponseBuilder rb) { + this.base = base; this.rb = rb; } @@ -81,7 +97,7 @@ public class TestAnalyticsQParserPlugin extends QParserPlugin { public void finish() throws IOException { NamedList analytics = new NamedList(); rb.rsp.add("analytics", analytics); - analytics.add("mycount", count); + analytics.add("mycount", count+base); if(this.delegate instanceof DelegatingCollector) { ((DelegatingCollector)this.delegate).finish(); } @@ -120,4 +136,43 @@ public class TestAnalyticsQParserPlugin extends QParserPlugin { rb.rsp.add("analytics", merged); } } + + class TestIterative extends IterativeMergeStrategy { + + public void process(ResponseBuilder rb, ShardRequest sreq) throws Exception { + int count = 0; + for(ShardResponse shardResponse : sreq.responses) { + NamedList response = shardResponse.getSolrResponse().getResponse(); + NamedList analytics = (NamedList)response.get("analytics"); + Integer c = (Integer)analytics.get("mycount"); + count += c.intValue(); + } + + ModifiableSolrParams params = new ModifiableSolrParams(); + params.add("distrib", "false"); + params.add("fq","{!count base="+count+"}"); + params.add("q","*:*"); + + + /* + * Call back to all the shards in the response and process the result. + */ + + QueryRequest request = new QueryRequest(params); + List> futures = callBack(sreq.responses, request); + + int nextCount = 0; + + for(Future future : futures) { + QueryResponse response = future.get().getResponse(); + NamedList analytics = (NamedList)response.getResponse().get("analytics"); + Integer c = (Integer)analytics.get("mycount"); + nextCount += c.intValue(); + } + + NamedList merged = new NamedList(); + merged.add("mycount", nextCount); + rb.rsp.add("analytics", merged); + } + } }