SOLR-1431 CommComponent abstracted

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1138383 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Noble Paul 2011-06-22 10:25:40 +00:00
parent 4ce9d15bb4
commit 3df588d55d
7 changed files with 494 additions and 296 deletions

View File

@ -0,0 +1,333 @@
package org.apache.solr.handler.component;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.SolrResponse;
import org.apache.solr.client.solrj.SolrServer;
import org.apache.solr.client.solrj.impl.CommonsHttpSolrServer;
import org.apache.solr.client.solrj.impl.LBHttpSolrServer;
import org.apache.solr.client.solrj.request.QueryRequest;
import org.apache.solr.cloud.CloudDescriptor;
import org.apache.solr.cloud.ZkController;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.CloudState;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.ShardParams;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.StrUtils;
import org.apache.solr.core.CoreDescriptor;
import org.apache.solr.request.SolrQueryRequest;
import java.util.*;
import java.util.concurrent.*;
public class HttpShardHandler extends ShardHandler {
private HttpShardHandlerFactory httpShardHandlerFactory;
private CompletionService<ShardResponse> completionService;
private Set<Future<ShardResponse>> pending;
private Map<String,List<String>> shardToURLs;
public HttpShardHandler(HttpShardHandlerFactory httpShardHandlerFactory) {
this.httpShardHandlerFactory = httpShardHandlerFactory;
completionService = new ExecutorCompletionService<ShardResponse>(httpShardHandlerFactory.commExecutor);
pending = new HashSet<Future<ShardResponse>>();
// maps "localhost:8983|localhost:7574" to a shuffled List("http://localhost:8983","http://localhost:7574")
// This is primarily to keep track of what order we should use to query the replicas of a shard
// so that we use the same replica for all phases of a distributed request.
shardToURLs = new HashMap<String,List<String>>();
}
private static class SimpleSolrResponse extends SolrResponse {
long elapsedTime;
NamedList<Object> nl;
@Override
public long getElapsedTime() {
return elapsedTime;
}
@Override
public NamedList<Object> getResponse() {
return nl;
}
@Override
public void setResponse(NamedList<Object> rsp) {
nl = rsp;
}
}
// Not thread safe... don't use in Callable.
// Don't modify the returned URL list.
private List<String> getURLs(String shard) {
List<String> urls = shardToURLs.get(shard);
if (urls==null) {
urls = StrUtils.splitSmart(shard, "|", true);
// convert shard to URL
for (int i=0; i<urls.size(); i++) {
urls.set(i, httpShardHandlerFactory.scheme + urls.get(i));
}
//
// Shuffle the list instead of use round-robin by default.
// This prevents accidental synchronization where multiple shards could get in sync
// and query the same replica at the same time.
//
if (urls.size() > 1)
Collections.shuffle(urls, httpShardHandlerFactory.r);
shardToURLs.put(shard, urls);
}
return urls;
}
public void submit(final ShardRequest sreq, final String shard, final ModifiableSolrParams params) {
// do this outside of the callable for thread safety reasons
final List<String> urls = getURLs(shard);
Callable<ShardResponse> task = new Callable<ShardResponse>() {
public ShardResponse call() throws Exception {
ShardResponse srsp = new ShardResponse();
srsp.setShardRequest(sreq);
srsp.setShard(shard);
SimpleSolrResponse ssr = new SimpleSolrResponse();
srsp.setSolrResponse(ssr);
long startTime = System.currentTimeMillis();
try {
params.remove(CommonParams.WT); // use default (currently javabin)
params.remove(CommonParams.VERSION);
// SolrRequest req = new QueryRequest(SolrRequest.METHOD.POST, "/select");
// use generic request to avoid extra processing of queries
QueryRequest req = new QueryRequest(params);
req.setMethod(SolrRequest.METHOD.POST);
// no need to set the response parser as binary is the default
// req.setResponseParser(new BinaryResponseParser());
// if there are no shards available for a slice, urls.size()==0
if (urls.size()==0) {
// TODO: what's the right error code here? We should use the same thing when
// all of the servers for a shard are down.
throw new SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE, "no servers hosting shard: " + shard);
}
if (urls.size() <= 1) {
String url = urls.get(0);
srsp.setShardAddress(url);
SolrServer server = new CommonsHttpSolrServer(url, httpShardHandlerFactory.client);
ssr.nl = server.request(req);
} else {
LBHttpSolrServer.Rsp rsp = httpShardHandlerFactory.loadbalancer.request(new LBHttpSolrServer.Req(req, urls));
ssr.nl = rsp.getResponse();
srsp.setShardAddress(rsp.getServer());
}
} catch (Throwable th) {
srsp.setException(th);
if (th instanceof SolrException) {
srsp.setResponseCode(((SolrException)th).code());
} else {
srsp.setResponseCode(-1);
}
}
ssr.elapsedTime = System.currentTimeMillis() - startTime;
return srsp;
}
};
pending.add( completionService.submit(task) );
}
/** returns a ShardResponse of the last response correlated with a ShardRequest */
ShardResponse take() {
while (pending.size() > 0) {
try {
Future<ShardResponse> future = completionService.take();
pending.remove(future);
ShardResponse rsp = future.get();
rsp.getShardRequest().responses.add(rsp);
if (rsp.getShardRequest().responses.size() == rsp.getShardRequest().actualShards.length) {
return rsp;
}
} catch (InterruptedException e) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
} catch (ExecutionException e) {
// should be impossible... the problem with catching the exception
// at this level is we don't know what ShardRequest it applied to
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Impossible Exception",e);
}
}
return null;
}
/** returns a ShardResponse of the last response correlated with a ShardRequest,
* or immediately returns a ShardResponse if there was an error detected
*/
public ShardResponse takeCompletedOrError() {
while (pending.size() > 0) {
try {
Future<ShardResponse> future = completionService.take();
pending.remove(future);
ShardResponse rsp = future.get();
if (rsp.getException() != null) return rsp; // if exception, return immediately
// add response to the response list... we do this after the take() and
// not after the completion of "call" so we know when the last response
// for a request was received. Otherwise we might return the same
// request more than once.
rsp.getShardRequest().responses.add(rsp);
if (rsp.getShardRequest().responses.size() == rsp.getShardRequest().actualShards.length) {
return rsp;
}
} catch (InterruptedException e) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
} catch (ExecutionException e) {
// should be impossible... the problem with catching the exception
// at this level is we don't know what ShardRequest it applied to
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Impossible Exception",e);
}
}
return null;
}
public void cancelAll() {
for (Future<ShardResponse> future : pending) {
// TODO: any issues with interrupting? shouldn't be if
// there are finally blocks to release connections.
future.cancel(true);
}
}
public void checkDistributed(ResponseBuilder rb) {
SolrQueryRequest req = rb.req;
SolrParams params = req.getParams();
rb.isDistrib = params.getBool("distrib",false);
String shards = params.get(ShardParams.SHARDS);
// for back compat, a shards param with URLs like localhost:8983/solr will mean that this
// search is distributed.
boolean hasShardURL = shards != null && shards.indexOf('/') > 0;
rb.isDistrib = hasShardURL | rb.isDistrib;
if (rb.isDistrib) {
// since the cost of grabbing cloud state is still up in the air, we grab it only
// if we need it.
CloudState cloudState = null;
Map<String,Slice> slices = null;
CoreDescriptor coreDescriptor = req.getCore().getCoreDescriptor();
CloudDescriptor cloudDescriptor = coreDescriptor.getCloudDescriptor();
ZkController zkController = coreDescriptor.getCoreContainer().getZkController();
if (shards != null) {
List<String> lst = StrUtils.splitSmart(shards, ",", true);
rb.shards = lst.toArray(new String[lst.size()]);
rb.slices = new String[rb.shards.length];
if (zkController != null) {
// figure out which shards are slices
for (int i=0; i<rb.shards.length; i++) {
if (rb.shards[i].indexOf('/') < 0) {
// this is a logical shard
rb.slices[i] = rb.shards[i];
rb.shards[i] = null;
}
}
}
} else if (zkController != null) {
// we weren't provided with a list of slices to query, so find the list that will cover the complete index
cloudState = zkController.getCloudState();
// TODO: check "collection" for which collection(s) to search.. but for now, just default
// to the collection for this core.
// This can be more efficient... we only record the name, even though we have the
// shard info we need in the next step of mapping slice->shards
slices = cloudState.getSlices(cloudDescriptor.getCollectionName());
rb.slices = slices.keySet().toArray(new String[slices.size()]);
rb.shards = new String[rb.slices.length];
/***
rb.slices = new String[slices.size()];
for (int i=0; i<rb.slices.length; i++) {
rb.slices[i] = slices.get(i).getName();
}
***/
}
//
// Map slices to shards
//
if (zkController != null) {
for (int i=0; i<rb.shards.length; i++) {
if (rb.shards[i] == null) {
if (cloudState == null) {
cloudState = zkController.getCloudState();
slices = cloudState.getSlices(cloudDescriptor.getCollectionName());
}
String sliceName = rb.slices[i];
Slice slice = slices.get(sliceName);
if (slice==null) {
// Treat this the same as "all servers down" for a slice, and let things continue
// if partial results are acceptable
rb.shards[i] = "";
continue;
// throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "no such shard: " + sliceName);
}
Map<String, ZkNodeProps> sliceShards = slice.getShards();
// For now, recreate the | delimited list of equivalent servers
Set<String> liveNodes = cloudState.getLiveNodes();
StringBuilder sliceShardsStr = new StringBuilder();
boolean first = true;
for (ZkNodeProps nodeProps : sliceShards.values()) {
if (!liveNodes.contains(nodeProps.get(ZkStateReader.NODE_NAME)))
continue;
if (first) {
first = false;
} else {
sliceShardsStr.append('|');
}
String url = nodeProps.get("url");
if (url.startsWith("http://"))
url = url.substring(7);
sliceShardsStr.append(url);
}
rb.shards[i] = sliceShardsStr.toString();
}
}
}
}
String shards_rows = params.get(ShardParams.SHARDS_ROWS);
if(shards_rows != null) {
rb.shards_rows = Integer.parseInt(shards_rows);
}
String shards_start = params.get(ShardParams.SHARDS_START);
if(shards_start != null) {
rb.shards_start = Integer.parseInt(shards_start);
}
}
}

View File

@ -0,0 +1,109 @@
package org.apache.solr.handler.component;
import org.apache.commons.httpclient.DefaultHttpMethodRetryHandler;
import org.apache.commons.httpclient.HttpClient;
import org.apache.commons.httpclient.MultiThreadedHttpConnectionManager;
import org.apache.commons.httpclient.params.HttpMethodParams;
import org.apache.solr.client.solrj.impl.LBHttpSolrServer;
import org.apache.solr.common.SolrException;
import org.apache.solr.core.PluginInfo;
import org.apache.solr.util.plugin.PluginInfoInitialized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.MalformedURLException;
import java.util.Random;
import java.util.concurrent.Executor;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* @author noble.paul@teamaol.com (noblep01)
* Date: 6/21/11
* Time: 12:14 PM
*/
public class HttpShardHandlerFactory extends ShardHandlerFactory implements PluginInfoInitialized{
protected static Logger log = LoggerFactory.getLogger(HttpShardHandlerFactory.class);
// We want an executor that doesn't take up any resources if
// it's not used, so it could be created statically for
// the distributed search component if desired.
//
// Consider CallerRuns policy and a lower max threads to throttle
// requests at some point (or should we simply return failure?)
Executor commExecutor = new ThreadPoolExecutor(
0,
Integer.MAX_VALUE,
5, TimeUnit.SECONDS, // terminate idle threads after 5 sec
new SynchronousQueue<Runnable>() // directly hand off tasks
);
HttpClient client;
Random r = new Random();
LBHttpSolrServer loadbalancer;
int soTimeout = 0; //current default values
int connectionTimeout = 0; //current default values
public String scheme = "http://"; //current default values
// socket timeout measured in ms, closes a socket if read
// takes longer than x ms to complete. throws
// java.net.SocketTimeoutException: Read timed out exception
static final String INIT_SO_TIMEOUT = "socketTimeout";
// connection timeout measures in ms, closes a socket if connection
// cannot be established within x ms. with a
// java.net.SocketTimeoutException: Connection timed out
static final String INIT_CONNECTION_TIMEOUT = "connTimeout";
// URL scheme to be used in distributed search.
static final String INIT_URL_SCHEME = "urlScheme";
public ShardHandler getShardHandler(){
return new HttpShardHandler(this);
}
public void init(PluginInfo info) {
if (info.initArgs != null) {
Object so = info.initArgs.get(INIT_SO_TIMEOUT);
if (so != null) {
soTimeout = (Integer) so;
log.info("Setting socketTimeout to: " + soTimeout);
}
Object urlScheme = info.initArgs.get(INIT_URL_SCHEME);
if (urlScheme != null) {
scheme = urlScheme + "://";
log.info("Setting urlScheme to: " + urlScheme);
}
Object co = info.initArgs.get(INIT_CONNECTION_TIMEOUT);
if (co != null) {
connectionTimeout = (Integer) co;
log.info("Setting shard-connection-timeout to: " + connectionTimeout);
}
}
MultiThreadedHttpConnectionManager mgr = new MultiThreadedHttpConnectionManager();
mgr.getParams().setDefaultMaxConnectionsPerHost(20);
mgr.getParams().setMaxTotalConnections(10000);
mgr.getParams().setConnectionTimeout(connectionTimeout);
mgr.getParams().setSoTimeout(soTimeout);
// mgr.getParams().setStaleCheckingEnabled(false);
client = new HttpClient(mgr);
// prevent retries (note: this didn't work when set on mgr.. needed to be set on client)
DefaultHttpMethodRetryHandler retryhandler = new DefaultHttpMethodRetryHandler(0, false);
client.getParams().setParameter(HttpMethodParams.RETRY_HANDLER, retryhandler);
try {
loadbalancer = new LBHttpSolrServer(client);
} catch (MalformedURLException e) {
// should be impossible since we're not passing any URLs here
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,e);
}
}
}

View File

@ -126,7 +126,6 @@ public class QueryComponent extends SearchComponent
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, e);
}
checkDistributed(rb);
}
@ -181,7 +180,7 @@ public class QueryComponent extends SearchComponent
slices = cloudState.getSlices(cloudDescriptor.getCollectionName());
rb.slices = slices.keySet().toArray(new String[slices.size()]);
rb.shards = new String[rb.slices.length];
/***
rb.slices = new String[slices.size()];
for (int i=0; i<rb.slices.length; i++) {

View File

@ -17,44 +17,13 @@
package org.apache.solr.handler.component;
import java.net.MalformedURLException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.Future;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.commons.httpclient.DefaultHttpMethodRetryHandler;
import org.apache.commons.httpclient.HttpClient;
import org.apache.commons.httpclient.MultiThreadedHttpConnectionManager;
import org.apache.commons.httpclient.params.HttpMethodParams;
import org.apache.lucene.queryParser.ParseException;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.SolrResponse;
import org.apache.solr.client.solrj.SolrServer;
import org.apache.solr.client.solrj.impl.CommonsHttpSolrServer;
import org.apache.solr.client.solrj.impl.LBHttpSolrServer;
import org.apache.solr.client.solrj.request.QueryRequest;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.ShardParams;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.RTimer;
import org.apache.solr.common.util.StrUtils;
import org.apache.solr.core.PluginInfo;
import org.apache.solr.core.SolrCore;
import org.apache.solr.handler.RequestHandlerBase;
import org.apache.solr.request.SolrQueryRequest;
@ -64,6 +33,8 @@ import org.apache.solr.util.plugin.SolrCoreAware;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.*;
/**
*
* Refer SOLR-281
@ -75,26 +46,15 @@ public class SearchHandler extends RequestHandlerBase implements SolrCoreAware
static final String INIT_FIRST_COMPONENTS = "first-components";
static final String INIT_LAST_COMPONENTS = "last-components";
// socket timeout measured in ms, closes a socket if read
// takes longer than x ms to complete. throws
// java.net.SocketTimeoutException: Read timed out exception
static final String INIT_SO_TIMEOUT = "shard-socket-timeout";
// connection timeout measures in ms, closes a socket if connection
// cannot be established within x ms. with a
// java.net.SocketTimeoutException: Connection timed out
static final String INIT_CONNECTION_TIMEOUT = "shard-connection-timeout";
// URL scheme to be used in distributed search.
static final String INIT_URL_SCHEME = "url-scheme";
static int soTimeout = 0; //current default values
static int connectionTimeout = 0; //current default values
public static String scheme = "http://"; //current default values
protected static Logger log = LoggerFactory.getLogger(SearchHandler.class);
protected List<SearchComponent> components = null;
private ShardHandlerFactory shardHandlerFactory ;
private PluginInfo shfInfo;
protected List<String> getDefaultComponents()
{
@ -108,6 +68,16 @@ public class SearchHandler extends RequestHandlerBase implements SolrCoreAware
return names;
}
public void init(PluginInfo info) {
init(info.initArgs);
for (PluginInfo child : info.children) {
if("shardHandlerFactory".equals(child.type)){
this.shfInfo = child;
break;
}
}
}
/**
* Initialize the components based on name. Note, if using {@link #INIT_FIRST_COMPONENTS} or {@link #INIT_LAST_COMPONENTS},
* then the {@link DebugComponent} will always occur last. If this is not desired, then one must explicitly declare all components using
@ -161,24 +131,12 @@ public class SearchHandler extends RequestHandlerBase implements SolrCoreAware
components.add(dbgCmp);
log.info("Adding debug component:" + dbgCmp);
}
Object co = initArgs.get(INIT_CONNECTION_TIMEOUT);
if (co != null) {
connectionTimeout = (Integer) co;
log.info("Setting shard-connection-timeout to: " + connectionTimeout);
}
Object so = initArgs.get(INIT_SO_TIMEOUT);
if (so != null) {
soTimeout = (Integer) so;
log.info("Setting shard-socket-timeout to: " + soTimeout);
}
Object urlScheme = initArgs.get(INIT_URL_SCHEME);
if (urlScheme != null) {
SearchHandler.scheme = (String) urlScheme + "://";
log.info("Setting url-scheme to: " + urlScheme);
if(shfInfo ==null) {
Map m = new HashMap();
m.put("class",HttpShardHandlerFactory.class.getName());
shfInfo = new PluginInfo("shardHandlerFactory", m,null,Collections.<PluginInfo>emptyList());
}
shardHandlerFactory = core.createInitInstance(shfInfo, ShardHandlerFactory.class, null, null);
}
public List<SearchComponent> getComponents() {
@ -204,6 +162,10 @@ public class SearchHandler extends RequestHandlerBase implements SolrCoreAware
final RTimer timer = rb.isDebug() ? new RTimer() : null;
ShardHandler shardHandler1 = shardHandlerFactory.getShardHandler();
shardHandler1.checkDistributed(rb);
if (timer == null) {
// non-debugging prepare phase
for( SearchComponent c : components ) {
@ -251,8 +213,6 @@ public class SearchHandler extends RequestHandlerBase implements SolrCoreAware
} else {
// a distributed request
HttpCommComponent comm = new HttpCommComponent();
if (rb.outgoing == null) {
rb.outgoing = new LinkedList<ShardRequest>();
}
@ -295,13 +255,13 @@ public class SearchHandler extends RequestHandlerBase implements SolrCoreAware
// we could try and detect when this is needed, but it could be tricky
params.set("NOW", Long.toString(rb.requestInfo.getNOW().getTime()));
}
String shardHandler = req.getParams().get(ShardParams.SHARDS_QT);
if (shardHandler == null) {
String shardQt = req.getParams().get(ShardParams.SHARDS_QT);
if (shardQt == null) {
params.remove(CommonParams.QT);
} else {
params.set(CommonParams.QT, shardHandler);
params.set(CommonParams.QT, shardQt);
}
comm.submit(sreq, shard, params);
shardHandler1.submit(sreq, shard, params);
}
}
@ -310,13 +270,13 @@ public class SearchHandler extends RequestHandlerBase implements SolrCoreAware
// the outgoing queue, send them out immediately (by exiting
// this loop)
while (rb.outgoing.size() == 0) {
ShardResponse srsp = comm.takeCompletedOrError();
ShardResponse srsp = shardHandler1.takeCompletedOrError();
if (srsp == null) break; // no more requests to wait for
// Was there an exception? If so, abort everything and
// rethrow
if (srsp.getException() != null) {
comm.cancelAll();
shardHandler1.cancelAll();
if (srsp.getException() instanceof SolrException) {
throw (SolrException)srsp.getException();
} else {
@ -377,225 +337,4 @@ public class SearchHandler extends RequestHandlerBase implements SolrCoreAware
// TODO: generalize how a comm component can fit into search component framework
// TODO: statics should be per-core singletons
class HttpCommComponent {
// We want an executor that doesn't take up any resources if
// it's not used, so it could be created statically for
// the distributed search component if desired.
//
// Consider CallerRuns policy and a lower max threads to throttle
// requests at some point (or should we simply return failure?)
static Executor commExecutor = new ThreadPoolExecutor(
0,
Integer.MAX_VALUE,
5, TimeUnit.SECONDS, // terminate idle threads after 5 sec
new SynchronousQueue<Runnable>() // directly hand off tasks
);
static HttpClient client;
static Random r = new Random();
static LBHttpSolrServer loadbalancer;
static {
MultiThreadedHttpConnectionManager mgr = new MultiThreadedHttpConnectionManager();
mgr.getParams().setDefaultMaxConnectionsPerHost(20);
mgr.getParams().setMaxTotalConnections(10000);
mgr.getParams().setConnectionTimeout(SearchHandler.connectionTimeout);
mgr.getParams().setSoTimeout(SearchHandler.soTimeout);
// mgr.getParams().setStaleCheckingEnabled(false);
client = new HttpClient(mgr);
// prevent retries (note: this didn't work when set on mgr.. needed to be set on client)
DefaultHttpMethodRetryHandler retryhandler = new DefaultHttpMethodRetryHandler(0, false);
client.getParams().setParameter(HttpMethodParams.RETRY_HANDLER, retryhandler);
try {
loadbalancer = new LBHttpSolrServer(client);
} catch (MalformedURLException e) {
// should be impossible since we're not passing any URLs here
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,e);
}
}
CompletionService<ShardResponse> completionService = new ExecutorCompletionService<ShardResponse>(commExecutor);
Set<Future<ShardResponse>> pending = new HashSet<Future<ShardResponse>>();
// maps "localhost:8983|localhost:7574" to a shuffled List("http://localhost:8983","http://localhost:7574")
// This is primarily to keep track of what order we should use to query the replicas of a shard
// so that we use the same replica for all phases of a distributed request.
Map<String,List<String>> shardToURLs = new HashMap<String,List<String>>();
HttpCommComponent() {
}
private static class SimpleSolrResponse extends SolrResponse {
long elapsedTime;
NamedList<Object> nl;
@Override
public long getElapsedTime() {
return elapsedTime;
}
@Override
public NamedList<Object> getResponse() {
return nl;
}
@Override
public void setResponse(NamedList<Object> rsp) {
nl = rsp;
}
}
// Not thread safe... don't use in Callable.
// Don't modify the returned URL list.
private List<String> getURLs(String shard) {
List<String> urls = shardToURLs.get(shard);
if (urls==null) {
urls = StrUtils.splitSmart(shard,"|",true);
// convert shard to URL
for (int i=0; i<urls.size(); i++) {
urls.set(i, SearchHandler.scheme + urls.get(i));
}
//
// Shuffle the list instead of use round-robin by default.
// This prevents accidental synchronization where multiple shards could get in sync
// and query the same replica at the same time.
//
if (urls.size() > 1)
Collections.shuffle(urls, r);
shardToURLs.put(shard, urls);
}
return urls;
}
void submit(final ShardRequest sreq, final String shard, final ModifiableSolrParams params) {
// do this outside of the callable for thread safety reasons
final List<String> urls = getURLs(shard);
Callable<ShardResponse> task = new Callable<ShardResponse>() {
public ShardResponse call() throws Exception {
ShardResponse srsp = new ShardResponse();
srsp.setShardRequest(sreq);
srsp.setShard(shard);
SimpleSolrResponse ssr = new SimpleSolrResponse();
srsp.setSolrResponse(ssr);
long startTime = System.currentTimeMillis();
try {
params.remove(CommonParams.WT); // use default (currently javabin)
params.remove(CommonParams.VERSION);
// SolrRequest req = new QueryRequest(SolrRequest.METHOD.POST, "/select");
// use generic request to avoid extra processing of queries
QueryRequest req = new QueryRequest(params);
req.setMethod(SolrRequest.METHOD.POST);
// no need to set the response parser as binary is the default
// req.setResponseParser(new BinaryResponseParser());
// if there are no shards available for a slice, urls.size()==0
if (urls.size()==0) {
// TODO: what's the right error code here? We should use the same thing when
// all of the servers for a shard are down.
throw new SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE, "no servers hosting shard: " + shard);
}
if (urls.size() <= 1) {
String url = urls.get(0);
srsp.setShardAddress(url);
SolrServer server = new CommonsHttpSolrServer(url, client);
ssr.nl = server.request(req);
} else {
LBHttpSolrServer.Rsp rsp = loadbalancer.request(new LBHttpSolrServer.Req(req, urls));
ssr.nl = rsp.getResponse();
srsp.setShardAddress(rsp.getServer());
}
} catch (Throwable th) {
srsp.setException(th);
if (th instanceof SolrException) {
srsp.setResponseCode(((SolrException)th).code());
} else {
srsp.setResponseCode(-1);
}
}
ssr.elapsedTime = System.currentTimeMillis() - startTime;
return srsp;
}
};
pending.add( completionService.submit(task) );
}
/** returns a ShardResponse of the last response correlated with a ShardRequest */
ShardResponse take() {
while (pending.size() > 0) {
try {
Future<ShardResponse> future = completionService.take();
pending.remove(future);
ShardResponse rsp = future.get();
rsp.getShardRequest().responses.add(rsp);
if (rsp.getShardRequest().responses.size() == rsp.getShardRequest().actualShards.length) {
return rsp;
}
} catch (InterruptedException e) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
} catch (ExecutionException e) {
// should be impossible... the problem with catching the exception
// at this level is we don't know what ShardRequest it applied to
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Impossible Exception",e);
}
}
return null;
}
/** returns a ShardResponse of the last response correlated with a ShardRequest,
* or immediately returns a ShardResponse if there was an error detected
*/
ShardResponse takeCompletedOrError() {
while (pending.size() > 0) {
try {
Future<ShardResponse> future = completionService.take();
pending.remove(future);
ShardResponse rsp = future.get();
if (rsp.getException() != null) return rsp; // if exception, return immediately
// add response to the response list... we do this after the take() and
// not after the completion of "call" so we know when the last response
// for a request was received. Otherwise we might return the same
// request more than once.
rsp.getShardRequest().responses.add(rsp);
if (rsp.getShardRequest().responses.size() == rsp.getShardRequest().actualShards.length) {
return rsp;
}
} catch (InterruptedException e) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
} catch (ExecutionException e) {
// should be impossible... the problem with catching the exception
// at this level is we don't know what ShardRequest it applied to
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Impossible Exception",e);
}
}
return null;
}
void cancelAll() {
for (Future<ShardResponse> future : pending) {
// TODO: any issues with interrupting? shouldn't be if
// there are finally blocks to release connections.
future.cancel(true);
}
}
}

View File

@ -0,0 +1,11 @@
package org.apache.solr.handler.component;
import org.apache.solr.common.params.ModifiableSolrParams;
public abstract class ShardHandler {
public abstract void checkDistributed(ResponseBuilder rb);
public abstract void submit(ShardRequest sreq, String shard, ModifiableSolrParams params) ;
public abstract ShardResponse takeCompletedOrError();
public abstract void cancelAll();
}

View File

@ -0,0 +1,7 @@
package org.apache.solr.handler.component;
public abstract class ShardHandlerFactory {
public abstract ShardHandler getShardHandler();
}

View File

@ -56,12 +56,12 @@ public final class ShardResponse {
return shard;
}
void setShardRequest(ShardRequest rsp)
public void setShardRequest(ShardRequest rsp)
{
this.req = rsp;
}
void setSolrResponse(SolrResponse rsp)
public void setSolrResponse(SolrResponse rsp)
{
this.rsp = rsp;
}