diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt index 27c7a87988d..20a0bd29177 100644 --- a/solr/CHANGES.txt +++ b/solr/CHANGES.txt @@ -163,7 +163,7 @@ New Features Gun Akkor via Erick Erickson) * SOLR-4816: CloudSolrServer can now route updates locally and no longer relies on inter-node - update forwarding. (Joel Bernstein, Mark Miller) + update forwarding. (Joel Bernstein, Shikhar Bhushan, Mark Miller) * SOLR-3249: Allow CloudSolrServer and SolrCmdDistributor to use JavaBin. (Mark Miller) diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrServer.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrServer.java index b4e41006eb3..68b0a3744bf 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrServer.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrServer.java @@ -30,9 +30,11 @@ import java.util.List; import java.util.Map; import java.util.Random; import java.util.Set; -import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.TimeoutException; import org.apache.http.client.HttpClient; @@ -82,6 +84,7 @@ public class CloudSolrServer extends SolrServer { private int zkClientTimeout = 10000; private volatile String defaultCollection; private final LBHttpSolrServer lbServer; + private final boolean shutdownLBHttpSolrServer; private HttpClient myClient; Random rand = new Random(); @@ -129,6 +132,7 @@ public class CloudSolrServer extends SolrServer { this.myClient = HttpClientUtil.createClient(null); this.lbServer = new LBHttpSolrServer(myClient); this.updatesToLeaders = true; + shutdownLBHttpSolrServer = true; } public CloudSolrServer(String zkHost, boolean updatesToLeaders) @@ -137,6 +141,7 @@ public class CloudSolrServer extends SolrServer { this.myClient = HttpClientUtil.createClient(null); this.lbServer = new LBHttpSolrServer(myClient); this.updatesToLeaders = updatesToLeaders; + shutdownLBHttpSolrServer = true; } /** @@ -148,6 +153,7 @@ public class CloudSolrServer extends SolrServer { this.zkHost = zkHost; this.lbServer = lbServer; this.updatesToLeaders = true; + shutdownLBHttpSolrServer = false; } /** @@ -160,6 +166,7 @@ public class CloudSolrServer extends SolrServer { this.zkHost = zkHost; this.lbServer = lbServer; this.updatesToLeaders = updatesToLeaders; + shutdownLBHttpSolrServer = false; } public ResponseParser getParser() { @@ -266,11 +273,8 @@ public class CloudSolrServer extends SolrServer { routableParams.remove(param); } } - if (params == null) { - return null; - } - String collection = params.get("collection", defaultCollection); + String collection = nonRoutableParams.get("collection", defaultCollection); if (collection == null) { throw new SolrServerException("No collection param specified on request and no default collection has been set."); } @@ -307,47 +311,45 @@ public class CloudSolrServer extends SolrServer { return null; } - Iterator> it = routes.entrySet().iterator(); - long start = System.nanoTime(); - if(this.parallelUpdates) { - ArrayBlockingQueue finishedTasks = new ArrayBlockingQueue(routes.size()); - while (it.hasNext()) { - Map.Entry entry = it.next(); - String url = entry.getKey(); - LBHttpSolrServer.Req lbRequest = entry.getValue(); - threadPool.execute(new RequestTask(url, lbRequest, finishedTasks)); + if (parallelUpdates) { + final Map>> responseFutures = new HashMap>>(); + for (final Map.Entry entry : routes.entrySet()) { + final String url = entry.getKey(); + final LBHttpSolrServer.Req lbRequest = entry.getValue(); + responseFutures.put(url, threadPool.submit(new Callable>() { + @Override + public NamedList call() throws Exception { + return lbServer.request(lbRequest).getResponse(); + } + })); } - while ((shardResponses.size() + exceptions.size()) != routes.size()) { - RequestTask requestTask = null; + for (final Map.Entry>> entry: responseFutures.entrySet()) { + final String url = entry.getKey(); + final Future> responseFuture = entry.getValue(); try { - requestTask = finishedTasks.take(); - } catch (Exception e) { - throw new SolrException(ErrorCode.SERVER_ERROR, e); - } - - Exception e = requestTask.getException(); - if (e != null) { - exceptions.add(requestTask.getLeader(), e); - } else { - shardResponses.add(requestTask.getLeader(), requestTask.getRsp().getResponse()); + shardResponses.add(url, responseFuture.get()); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } catch (ExecutionException e) { + exceptions.add(url, e.getCause()); } } - if(exceptions.size() > 0) { + if (exceptions.size() > 0) { throw new RouteException(ErrorCode.SERVER_ERROR, exceptions, routes); } } else { - while (it.hasNext()) { - Map.Entry entry = it.next(); + for (Map.Entry entry : routes.entrySet()) { String url = entry.getKey(); LBHttpSolrServer.Req lbRequest = entry.getValue(); - try{ + try { NamedList rsp = lbServer.request(lbRequest).getResponse(); shardResponses.add(url, rsp); - } catch(Exception e) { + } catch (Exception e) { throw new SolrServerException(e); } } @@ -439,44 +441,6 @@ public class CloudSolrServer extends SolrServer { return condensed; } - class RequestTask implements Runnable { - - private LBHttpSolrServer.Req req; - private ArrayBlockingQueue tasks; - private LBHttpSolrServer.Rsp rsp; - private String leader; - private Exception e; - - public RequestTask(String leader, LBHttpSolrServer.Req req, ArrayBlockingQueue tasks) { - this.req = req; - this.tasks = tasks; - this.leader = leader; - } - - public void run() { - try { - LBHttpSolrServer lb = new LBHttpSolrServer(myClient); - this.rsp = lb.request(req); - this.tasks.add(this); - } catch (Exception e) { - this.e = e; - this.tasks.add(this); - } - } - - public Exception getException() { - return e; - } - - public String getLeader() { - return this.leader; - } - - public LBHttpSolrServer.Rsp getRsp() { - return rsp; - } - } - class RouteResponse extends NamedList { private NamedList routeResponses; private Map routes; @@ -697,6 +661,11 @@ public class CloudSolrServer extends SolrServer { zkStateReader = null; } } + + if (shutdownLBHttpSolrServer) { + lbServer.shutdown(); + } + if (myClient!=null) { myClient.getConnectionManager().shutdown(); }