mirror of https://github.com/apache/lucene.git
SOLR-4816: Don't create "loads" of LBHttpSolrServer's, shutdown LBHttpSolrServer when appropriate, get collection from nonRoutableParams.
git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1522684 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
f2b3d010b3
commit
942dabba8c
|
@ -163,7 +163,7 @@ New Features
|
|||
Gun Akkor via Erick Erickson)
|
||||
|
||||
* SOLR-4816: CloudSolrServer can now route updates locally and no longer relies on inter-node
|
||||
update forwarding. (Joel Bernstein, Mark Miller)
|
||||
update forwarding. (Joel Bernstein, Shikhar Bhushan, Mark Miller)
|
||||
|
||||
* SOLR-3249: Allow CloudSolrServer and SolrCmdDistributor to use JavaBin. (Mark Miller)
|
||||
|
||||
|
|
|
@ -30,9 +30,11 @@ import java.util.List;
|
|||
import java.util.Map;
|
||||
import java.util.Random;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ArrayBlockingQueue;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
import org.apache.http.client.HttpClient;
|
||||
|
@ -82,6 +84,7 @@ public class CloudSolrServer extends SolrServer {
|
|||
private int zkClientTimeout = 10000;
|
||||
private volatile String defaultCollection;
|
||||
private final LBHttpSolrServer lbServer;
|
||||
private final boolean shutdownLBHttpSolrServer;
|
||||
private HttpClient myClient;
|
||||
Random rand = new Random();
|
||||
|
||||
|
@ -129,6 +132,7 @@ public class CloudSolrServer extends SolrServer {
|
|||
this.myClient = HttpClientUtil.createClient(null);
|
||||
this.lbServer = new LBHttpSolrServer(myClient);
|
||||
this.updatesToLeaders = true;
|
||||
shutdownLBHttpSolrServer = true;
|
||||
}
|
||||
|
||||
public CloudSolrServer(String zkHost, boolean updatesToLeaders)
|
||||
|
@ -137,6 +141,7 @@ public class CloudSolrServer extends SolrServer {
|
|||
this.myClient = HttpClientUtil.createClient(null);
|
||||
this.lbServer = new LBHttpSolrServer(myClient);
|
||||
this.updatesToLeaders = updatesToLeaders;
|
||||
shutdownLBHttpSolrServer = true;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -148,6 +153,7 @@ public class CloudSolrServer extends SolrServer {
|
|||
this.zkHost = zkHost;
|
||||
this.lbServer = lbServer;
|
||||
this.updatesToLeaders = true;
|
||||
shutdownLBHttpSolrServer = false;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -160,6 +166,7 @@ public class CloudSolrServer extends SolrServer {
|
|||
this.zkHost = zkHost;
|
||||
this.lbServer = lbServer;
|
||||
this.updatesToLeaders = updatesToLeaders;
|
||||
shutdownLBHttpSolrServer = false;
|
||||
}
|
||||
|
||||
public ResponseParser getParser() {
|
||||
|
@ -266,11 +273,8 @@ public class CloudSolrServer extends SolrServer {
|
|||
routableParams.remove(param);
|
||||
}
|
||||
}
|
||||
if (params == null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
String collection = params.get("collection", defaultCollection);
|
||||
String collection = nonRoutableParams.get("collection", defaultCollection);
|
||||
if (collection == null) {
|
||||
throw new SolrServerException("No collection param specified on request and no default collection has been set.");
|
||||
}
|
||||
|
@ -307,47 +311,45 @@ public class CloudSolrServer extends SolrServer {
|
|||
return null;
|
||||
}
|
||||
|
||||
Iterator<Map.Entry<String, LBHttpSolrServer.Req>> it = routes.entrySet().iterator();
|
||||
|
||||
long start = System.nanoTime();
|
||||
if(this.parallelUpdates) {
|
||||
ArrayBlockingQueue<RequestTask> finishedTasks = new ArrayBlockingQueue<RequestTask>(routes.size());
|
||||
|
||||
while (it.hasNext()) {
|
||||
Map.Entry<String, LBHttpSolrServer.Req> entry = it.next();
|
||||
String url = entry.getKey();
|
||||
LBHttpSolrServer.Req lbRequest = entry.getValue();
|
||||
threadPool.execute(new RequestTask(url, lbRequest, finishedTasks));
|
||||
if (parallelUpdates) {
|
||||
final Map<String, Future<NamedList<?>>> responseFutures = new HashMap<String, Future<NamedList<?>>>();
|
||||
for (final Map.Entry<String, LBHttpSolrServer.Req> entry : routes.entrySet()) {
|
||||
final String url = entry.getKey();
|
||||
final LBHttpSolrServer.Req lbRequest = entry.getValue();
|
||||
responseFutures.put(url, threadPool.submit(new Callable<NamedList<?>>() {
|
||||
@Override
|
||||
public NamedList<?> call() throws Exception {
|
||||
return lbServer.request(lbRequest).getResponse();
|
||||
}
|
||||
}));
|
||||
}
|
||||
|
||||
while ((shardResponses.size() + exceptions.size()) != routes.size()) {
|
||||
RequestTask requestTask = null;
|
||||
for (final Map.Entry<String, Future<NamedList<?>>> entry: responseFutures.entrySet()) {
|
||||
final String url = entry.getKey();
|
||||
final Future<NamedList<?>> responseFuture = entry.getValue();
|
||||
try {
|
||||
requestTask = finishedTasks.take();
|
||||
} catch (Exception e) {
|
||||
throw new SolrException(ErrorCode.SERVER_ERROR, e);
|
||||
}
|
||||
|
||||
Exception e = requestTask.getException();
|
||||
if (e != null) {
|
||||
exceptions.add(requestTask.getLeader(), e);
|
||||
} else {
|
||||
shardResponses.add(requestTask.getLeader(), requestTask.getRsp().getResponse());
|
||||
shardResponses.add(url, responseFuture.get());
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
throw new RuntimeException(e);
|
||||
} catch (ExecutionException e) {
|
||||
exceptions.add(url, e.getCause());
|
||||
}
|
||||
}
|
||||
|
||||
if(exceptions.size() > 0) {
|
||||
if (exceptions.size() > 0) {
|
||||
throw new RouteException(ErrorCode.SERVER_ERROR, exceptions, routes);
|
||||
}
|
||||
} else {
|
||||
while (it.hasNext()) {
|
||||
Map.Entry<String, LBHttpSolrServer.Req> entry = it.next();
|
||||
for (Map.Entry<String, LBHttpSolrServer.Req> entry : routes.entrySet()) {
|
||||
String url = entry.getKey();
|
||||
LBHttpSolrServer.Req lbRequest = entry.getValue();
|
||||
try{
|
||||
try {
|
||||
NamedList rsp = lbServer.request(lbRequest).getResponse();
|
||||
shardResponses.add(url, rsp);
|
||||
} catch(Exception e) {
|
||||
} catch (Exception e) {
|
||||
throw new SolrServerException(e);
|
||||
}
|
||||
}
|
||||
|
@ -439,44 +441,6 @@ public class CloudSolrServer extends SolrServer {
|
|||
return condensed;
|
||||
}
|
||||
|
||||
class RequestTask implements Runnable {
|
||||
|
||||
private LBHttpSolrServer.Req req;
|
||||
private ArrayBlockingQueue<RequestTask> tasks;
|
||||
private LBHttpSolrServer.Rsp rsp;
|
||||
private String leader;
|
||||
private Exception e;
|
||||
|
||||
public RequestTask(String leader, LBHttpSolrServer.Req req, ArrayBlockingQueue<RequestTask> tasks) {
|
||||
this.req = req;
|
||||
this.tasks = tasks;
|
||||
this.leader = leader;
|
||||
}
|
||||
|
||||
public void run() {
|
||||
try {
|
||||
LBHttpSolrServer lb = new LBHttpSolrServer(myClient);
|
||||
this.rsp = lb.request(req);
|
||||
this.tasks.add(this);
|
||||
} catch (Exception e) {
|
||||
this.e = e;
|
||||
this.tasks.add(this);
|
||||
}
|
||||
}
|
||||
|
||||
public Exception getException() {
|
||||
return e;
|
||||
}
|
||||
|
||||
public String getLeader() {
|
||||
return this.leader;
|
||||
}
|
||||
|
||||
public LBHttpSolrServer.Rsp getRsp() {
|
||||
return rsp;
|
||||
}
|
||||
}
|
||||
|
||||
class RouteResponse extends NamedList {
|
||||
private NamedList routeResponses;
|
||||
private Map<String, LBHttpSolrServer.Req> routes;
|
||||
|
@ -697,6 +661,11 @@ public class CloudSolrServer extends SolrServer {
|
|||
zkStateReader = null;
|
||||
}
|
||||
}
|
||||
|
||||
if (shutdownLBHttpSolrServer) {
|
||||
lbServer.shutdown();
|
||||
}
|
||||
|
||||
if (myClient!=null) {
|
||||
myClient.getConnectionManager().shutdown();
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue