SOLR-6398: Add IterativeMergeStrategy to support running Parallel Iterative Algorithms inside of Solr

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1720422 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Joel Bernstein 2015-12-16 19:44:43 +00:00
parent 8f9b46d024
commit d200689b2b
4 changed files with 211 additions and 6 deletions

View File

@ -118,6 +118,9 @@ New Features
* SOLR-7904: Add StreamExpression Support to FacetStream (Dennis Gove)
* SOLR-6398: Add IterativeMergeStrategy to support running Parallel Iterative Algorithms inside of Solr
(Joel Bernstein)
Bug Fixes
----------------------
* SOLR-8386: Add field option in the new admin UI schema page loads up even when no schemaFactory has been

View File

@ -0,0 +1,124 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.handler.component;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.List;
import java.util.ArrayList;
import org.apache.lucene.util.NamedThreadFactory;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.impl.HttpClientUtil;
import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.client.solrj.request.QueryRequest;
import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.common.SolrDocumentList;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.util.ExecutorUtil;
import org.apache.solr.common.util.SolrjNamedThreadFactory;
import org.apache.solr.search.SolrIndexSearcher;
import org.apache.http.client.HttpClient;
public abstract class IterativeMergeStrategy implements MergeStrategy {
protected ExecutorService executorService;
protected static HttpClient httpClient;
static {
ModifiableSolrParams params = new ModifiableSolrParams();
params.set(HttpClientUtil.PROP_MAX_CONNECTIONS, 128);
params.set(HttpClientUtil.PROP_MAX_CONNECTIONS_PER_HOST, 32);
httpClient = HttpClientUtil.createClient(params);
}
public void merge(ResponseBuilder rb, ShardRequest sreq) {
rb._responseDocs = new SolrDocumentList(); // Null pointers will occur otherwise.
rb.onePassDistributedQuery = true; // Turn off the second pass distributed.
executorService = ExecutorUtil.newMDCAwareCachedThreadPool(new SolrjNamedThreadFactory("IterativeMergeStrategy"));
try {
process(rb, sreq);
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
executorService.shutdownNow();
}
}
public boolean mergesIds() {
return true;
}
public int getCost() {
return 0;
}
public boolean handlesMergeFields() {
return false;
}
public void handleMergeFields(ResponseBuilder rb, SolrIndexSearcher searcher) {
}
public static class CallBack implements Callable<CallBack> {
private HttpSolrClient solrClient;
private QueryRequest req;
private QueryResponse response;
private ShardResponse originalShardResponse;
public CallBack(ShardResponse originalShardResponse, QueryRequest req) {
this.solrClient = new HttpSolrClient(originalShardResponse.getShardAddress(), httpClient);
this.req = req;
this.originalShardResponse = originalShardResponse;
req.setMethod(SolrRequest.METHOD.POST);
ModifiableSolrParams params = (ModifiableSolrParams)req.getParams();
params.add("distrib", "false");
}
public QueryResponse getResponse() {
return this.response;
}
public ShardResponse getOriginalShardResponse() {
return this.originalShardResponse;
}
public CallBack call() throws Exception{
this.response = req.process(solrClient);
return this;
}
}
public List<Future<CallBack>> callBack(List<ShardResponse> responses, QueryRequest req) {
List<Future<CallBack>> futures = new ArrayList();
for(ShardResponse response : responses) {
futures.add(this.executorService.submit(new CallBack(response, req)));
}
return futures;
}
public Future<CallBack> callBack(ShardResponse response, QueryRequest req) {
return this.executorService.submit(new CallBack(response, req));
}
protected abstract void process(ResponseBuilder rb, ShardRequest sreq) throws Exception;
}

View File

@ -59,6 +59,12 @@ public class AnalyticsMergeStrategyTest extends BaseDistributedSearchTestCase {
commit();
/*
* The count qparser plugin is pointing to the TestAnalyticsQParserPlugin. This class defines a simple AnalyticsQuery and
* has two merge strategies. If the iterate local param is true then an InterativeMergeStrategy is used.
*/
ModifiableSolrParams params = new ModifiableSolrParams();
params.add("q", "*:*");
params.add("fq", "{!count}");
@ -66,6 +72,14 @@ public class AnalyticsMergeStrategyTest extends BaseDistributedSearchTestCase {
QueryResponse rsp = queryServer(params);
assertCount(rsp, 11);
//Test IterativeMergeStrategy
params = new ModifiableSolrParams();
params.add("q", "*:*");
params.add("fq", "{!count iterate=true}");
setDistributedParams(params);
rsp = queryServer(params);
assertCountOnly(rsp, 44);
params = new ModifiableSolrParams();
params.add("q", "id:(1 2 5 6)");
params.add("fq", "{!count}");
@ -74,6 +88,15 @@ public class AnalyticsMergeStrategyTest extends BaseDistributedSearchTestCase {
assertCount(rsp, 4);
}
private void assertCountOnly(QueryResponse rsp, int count) throws Exception {
NamedList response = rsp.getResponse();
NamedList analytics = (NamedList)response.get("analytics");
Integer c = (Integer)analytics.get("mycount");
if(c.intValue() != count) {
throw new Exception("Count is not correct:"+count+":"+c.intValue());
}
}
private void assertCount(QueryResponse rsp, int count) throws Exception {
NamedList response = rsp.getResponse();
NamedList analytics = (NamedList)response.get("analytics");

View File

@ -17,11 +17,15 @@
package org.apache.solr.search;
import org.apache.lucene.search.LeafCollector;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.IndexSearcher;
import org.apache.solr.client.solrj.request.QueryRequest;
import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.handler.component.IterativeMergeStrategy;
import org.apache.solr.handler.component.ResponseBuilder;
import org.apache.solr.handler.component.ShardRequest;
import org.apache.solr.request.SolrQueryRequest;
@ -29,6 +33,8 @@ import org.apache.solr.handler.component.MergeStrategy;
import org.apache.solr.handler.component.ShardResponse;
import org.junit.Ignore;
import java.util.List;
import java.util.concurrent.Future;
import java.io.IOException;
@Ignore
@ -50,26 +56,36 @@ public class TestAnalyticsQParserPlugin extends QParserPlugin {
}
public Query parse() throws SyntaxError {
return new TestAnalyticsQuery(new TestAnalyticsMergeStrategy());
int base = localParams.getInt("base", 0);
boolean iterate = localParams.getBool("iterate", false);
if(iterate)
return new TestAnalyticsQuery(base, new TestIterative());
else
return new TestAnalyticsQuery(base, new TestAnalyticsMergeStrategy());
}
}
class TestAnalyticsQuery extends AnalyticsQuery {
public TestAnalyticsQuery(MergeStrategy mergeStrategy) {
private int base;
public TestAnalyticsQuery(int base, MergeStrategy mergeStrategy) {
super(mergeStrategy);
this.base = base;
}
public DelegatingCollector getAnalyticsCollector(ResponseBuilder rb, IndexSearcher searcher) {
return new TestAnalyticsCollector(rb);
return new TestAnalyticsCollector(base, rb);
}
}
class TestAnalyticsCollector extends DelegatingCollector {
ResponseBuilder rb;
int count;
int base;
public TestAnalyticsCollector(ResponseBuilder rb) {
public TestAnalyticsCollector(int base, ResponseBuilder rb) {
this.base = base;
this.rb = rb;
}
@ -81,7 +97,7 @@ public class TestAnalyticsQParserPlugin extends QParserPlugin {
public void finish() throws IOException {
NamedList analytics = new NamedList();
rb.rsp.add("analytics", analytics);
analytics.add("mycount", count);
analytics.add("mycount", count+base);
if(this.delegate instanceof DelegatingCollector) {
((DelegatingCollector)this.delegate).finish();
}
@ -120,4 +136,43 @@ public class TestAnalyticsQParserPlugin extends QParserPlugin {
rb.rsp.add("analytics", merged);
}
}
class TestIterative extends IterativeMergeStrategy {
public void process(ResponseBuilder rb, ShardRequest sreq) throws Exception {
int count = 0;
for(ShardResponse shardResponse : sreq.responses) {
NamedList response = shardResponse.getSolrResponse().getResponse();
NamedList analytics = (NamedList)response.get("analytics");
Integer c = (Integer)analytics.get("mycount");
count += c.intValue();
}
ModifiableSolrParams params = new ModifiableSolrParams();
params.add("distrib", "false");
params.add("fq","{!count base="+count+"}");
params.add("q","*:*");
/*
* Call back to all the shards in the response and process the result.
*/
QueryRequest request = new QueryRequest(params);
List<Future<CallBack>> futures = callBack(sreq.responses, request);
int nextCount = 0;
for(Future<CallBack> future : futures) {
QueryResponse response = future.get().getResponse();
NamedList analytics = (NamedList)response.getResponse().get("analytics");
Integer c = (Integer)analytics.get("mycount");
nextCount += c.intValue();
}
NamedList merged = new NamedList();
merged.add("mycount", nextCount);
rb.rsp.add("analytics", merged);
}
}
}