From bd4f9b9896adac095001d7cd0d8c6d5ccadc6bda Mon Sep 17 00:00:00 2001 From: Mark Robert Miller Date: Wed, 11 Sep 2013 01:19:45 +0000 Subject: [PATCH] SOLR-4816: CloudSolrServer can now route updates locally and no longer relies on inter-node update forwarding. SOLR-3249: Allow CloudSolrServer and SolrCmdDistributor to use JavaBin. SOLR-4816: CloudSolrServer now uses multiple threads to send updates by default. git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1521713 13f79535-47bb-0310-9956-ffa450edef68 --- solr/CHANGES.txt | 16 + .../solr/handler/loader/JavabinLoader.java | 19 +- .../apache/solr/update/AddUpdateCommand.java | 9 +- .../solr/update/SolrCmdDistributor.java | 24 +- .../org/apache/solr/update/UpdateCommand.java | 1 - .../solr/cloud/AliasIntegrationTest.java | 1 + .../solr/cloud/BasicDistributedZkTest.java | 1 + .../solrj/impl/BinaryRequestWriter.java | 2 +- .../client/solrj/impl/CloudSolrServer.java | 363 ++++++++++++++- .../client/solrj/impl/HttpSolrServer.java | 4 +- .../client/solrj/impl/LBHttpSolrServer.java | 25 +- .../request/JavaBinUpdateRequestCodec.java | 32 +- .../client/solrj/request/RequestWriter.java | 7 +- .../client/solrj/request/UpdateRequest.java | 416 +++++++++++++----- .../solrj/request/UpdateRequestExt.java | 252 ----------- .../solrj/impl/CloudSolrServerTest.java | 95 ++++ .../solrj/request/TestUpdateRequestCodec.java | 25 +- .../cloud/AbstractFullDistribZkTestBase.java | 2 + 18 files changed, 886 insertions(+), 408 deletions(-) delete mode 100644 solr/solrj/src/java/org/apache/solr/client/solrj/request/UpdateRequestExt.java diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt index d1d9bfc2843..b3bdca90287 100644 --- a/solr/CHANGES.txt +++ b/solr/CHANGES.txt @@ -89,6 +89,15 @@ Upgrading from Solr 4.4.0 init param option is now deprecated and should be replaced with the more standard . See SOLR-4249 for more details. +* UpdateRequestExt has been removed as part of SOLR-4816. You should use UpdateRequest + instead. + +* CloudSolrServer can now use multiple threads to add documents by default. This is a + small change in runtime semantics when using the bulk add method - you will still + end up with the same exception on a failure, but some documents beyond the one that + failed may have made it in. To get the old, single threaded behavior, set parallel updates + to false on the CloudSolrServer instance. + Detailed Change List ---------------------- @@ -140,7 +149,11 @@ New Features can specify facet.threads to parallelize loading the uninverted fields. In at least one extreme case this reduced warmup time from 20 seconds to 3 seconds. (Janne Majaranta, Gun Akkor via Erick Erickson) + +* SOLR-4816: CloudSolrServer can now route updates locally and no longer relies on inter-node + update forwarding. (Joel Bernstein, Mark Miller) +* SOLR-3249: Allow CloudSolrServer and SolrCmdDistributor to use JavaBin. (Mark Miller) Bug Fixes ---------------------- @@ -223,6 +236,9 @@ Optimizations * SOLR-5057: QueryResultCache should not related with the order of fq's list (Feihong Huang via Erick Erickson) +* SOLR-4816: CloudSolrServer now uses multiple threads to send updates by default. + (Joel Bernstein via Mark Miller) + Other Changes ---------------------- diff --git a/solr/core/src/java/org/apache/solr/handler/loader/JavabinLoader.java b/solr/core/src/java/org/apache/solr/handler/loader/JavabinLoader.java index 09014f3855c..d6671414ccf 100644 --- a/solr/core/src/java/org/apache/solr/handler/loader/JavabinLoader.java +++ b/solr/core/src/java/org/apache/solr/handler/loader/JavabinLoader.java @@ -37,6 +37,9 @@ import org.slf4j.LoggerFactory; import java.io.EOFException; import java.io.IOException; import java.io.InputStream; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; /** * Update handler which uses the JavaBin format @@ -97,7 +100,7 @@ public class JavabinLoader extends ContentStreamLoader { } catch (EOFException e) { break; // this is expected } - if (update.getDeleteById() != null || update.getDeleteQuery() != null) { + if (update.getDeleteByIdMap() != null || update.getDeleteQuery() != null) { delete(req, update, processor); } } @@ -118,9 +121,17 @@ public class JavabinLoader extends ContentStreamLoader { delcmd.commitWithin = params.getInt(UpdateParams.COMMIT_WITHIN, -1); } - if(update.getDeleteById() != null) { - for (String s : update.getDeleteById()) { - delcmd.id = s; + if(update.getDeleteByIdMap() != null) { + Set>> entries = update.getDeleteByIdMap().entrySet(); + for (Entry> e : entries) { + delcmd.id = e.getKey(); + Map map = e.getValue(); + if (map != null) { + Long version = (Long) map.get("ver"); + if (version != null) { + delcmd.setVersion(version); + } + } processor.processDelete(delcmd); delcmd.clear(); } diff --git a/solr/core/src/java/org/apache/solr/update/AddUpdateCommand.java b/solr/core/src/java/org/apache/solr/update/AddUpdateCommand.java index 391cbf1801c..56561750fed 100644 --- a/solr/core/src/java/org/apache/solr/update/AddUpdateCommand.java +++ b/solr/core/src/java/org/apache/solr/update/AddUpdateCommand.java @@ -17,6 +17,10 @@ package org.apache.solr.update; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + import org.apache.lucene.document.Document; import org.apache.lucene.index.IndexDocument; import org.apache.lucene.index.Term; @@ -28,11 +32,6 @@ import org.apache.solr.request.SolrQueryRequest; import org.apache.solr.schema.IndexSchema; import org.apache.solr.schema.SchemaField; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Iterator; -import java.util.List; - /** * */ diff --git a/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java b/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java index f599375a0f6..bf9a15279ed 100644 --- a/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java +++ b/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java @@ -34,7 +34,7 @@ import java.util.concurrent.RejectedExecutionException; import org.apache.solr.client.solrj.SolrServerException; import org.apache.solr.client.solrj.impl.HttpSolrServer; import org.apache.solr.client.solrj.request.AbstractUpdateRequest; -import org.apache.solr.client.solrj.request.UpdateRequestExt; +import org.apache.solr.client.solrj.request.UpdateRequest; import org.apache.solr.common.SolrException; import org.apache.solr.common.SolrException.ErrorCode; import org.apache.solr.common.cloud.ZkCoreNodeProps; @@ -152,8 +152,9 @@ public class SolrCmdDistributor { // finish with the pending requests checkResponses(false); - UpdateRequestExt ureq = new UpdateRequestExt(); + UpdateRequest ureq = new UpdateRequest(); ureq.add(cmd.solrDoc, cmd.commitWithin, cmd.overwrite); + ureq.setParams(params); syncRequest(node, ureq); } @@ -173,7 +174,7 @@ public class SolrCmdDistributor { deleteRequest.cmd = clonedCmd; deleteRequest.params = params; - UpdateRequestExt ureq = new UpdateRequestExt(); + UpdateRequest ureq = new UpdateRequest(); if (cmd.isDeleteById()) { ureq.deleteById(cmd.getId(), cmd.getVersion()); } else { @@ -185,7 +186,7 @@ public class SolrCmdDistributor { } } - private void syncRequest(Node node, UpdateRequestExt ureq) { + private void syncRequest(Node node, UpdateRequest ureq) { Request sreq = new Request(); sreq.node = node; sreq.ureq = ureq; @@ -224,7 +225,7 @@ public class SolrCmdDistributor { // currently, we dont try to piggy back on outstanding adds or deletes - UpdateRequestExt ureq = new UpdateRequestExt(); + UpdateRequest ureq = new UpdateRequest(); ureq.setParams(params); addCommit(ureq, cmd); @@ -265,7 +266,7 @@ public class SolrCmdDistributor { flushDeletes(maxBufferedDeletesPerServer); } - void addCommit(UpdateRequestExt ureq, CommitUpdateCommand cmd) { + void addCommit(UpdateRequest ureq, CommitUpdateCommand cmd) { if (cmd == null) return; ureq.setAction(cmd.optimize ? AbstractUpdateRequest.ACTION.OPTIMIZE : AbstractUpdateRequest.ACTION.COMMIT, false, cmd.waitSearcher, cmd.maxOptimizeSegments, cmd.softCommit, cmd.expungeDeletes); @@ -281,14 +282,13 @@ public class SolrCmdDistributor { List alist = adds.get(node); if (alist == null || alist.size() < limit) continue; - UpdateRequestExt ureq = new UpdateRequestExt(); + UpdateRequest ureq = new UpdateRequest(); ModifiableSolrParams combinedParams = new ModifiableSolrParams(); - + for (AddRequest aReq : alist) { AddUpdateCommand cmd = aReq.cmd; combinedParams.add(aReq.params); - ureq.add(cmd.solrDoc, cmd.commitWithin, cmd.overwrite); } @@ -315,7 +315,7 @@ public class SolrCmdDistributor { for (Node node : nodes) { List dlist = deletes.get(node); if (dlist == null || dlist.size() < limit) continue; - UpdateRequestExt ureq = new UpdateRequestExt(); + UpdateRequest ureq = new UpdateRequest(); ModifiableSolrParams combinedParams = new ModifiableSolrParams(); @@ -354,14 +354,14 @@ public class SolrCmdDistributor { public static class Request { public Node node; - UpdateRequestExt ureq; + UpdateRequest ureq; NamedList ursp; int rspCode; public Exception exception; int retries; } - void submit(UpdateRequestExt ureq, Node node) { + void submit(UpdateRequest ureq, Node node) { Request sreq = new Request(); sreq.node = node; sreq.ureq = ureq; diff --git a/solr/core/src/java/org/apache/solr/update/UpdateCommand.java b/solr/core/src/java/org/apache/solr/update/UpdateCommand.java index 4efd11ad5e3..a3632516a3a 100644 --- a/solr/core/src/java/org/apache/solr/update/UpdateCommand.java +++ b/solr/core/src/java/org/apache/solr/update/UpdateCommand.java @@ -17,7 +17,6 @@ package org.apache.solr.update; -import org.apache.solr.client.solrj.SolrRequest; import org.apache.solr.request.SolrQueryRequest; diff --git a/solr/core/src/test/org/apache/solr/cloud/AliasIntegrationTest.java b/solr/core/src/test/org/apache/solr/cloud/AliasIntegrationTest.java index bc37e9f9681..0815bf699c7 100644 --- a/solr/core/src/test/org/apache/solr/cloud/AliasIntegrationTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/AliasIntegrationTest.java @@ -153,6 +153,7 @@ public class AliasIntegrationTest extends AbstractFullDistribZkTestBase { // search with new cloud client CloudSolrServer cloudSolrServer = new CloudSolrServer(zkServer.getZkAddress(), random().nextBoolean()); + cloudSolrServer.setParallelUpdates(random().nextBoolean()); query = new SolrQuery("*:*"); query.set("collection", "testalias"); res = cloudSolrServer.query(query); diff --git a/solr/core/src/test/org/apache/solr/cloud/BasicDistributedZkTest.java b/solr/core/src/test/org/apache/solr/cloud/BasicDistributedZkTest.java index dcb1ddb50f3..63b131289c0 100644 --- a/solr/core/src/test/org/apache/solr/cloud/BasicDistributedZkTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/BasicDistributedZkTest.java @@ -1104,6 +1104,7 @@ public class BasicDistributedZkTest extends AbstractFullDistribZkTestBase { synchronized(this) { try { commondCloudSolrServer = new CloudSolrServer(zkServer.getZkAddress(), random().nextBoolean()); + commondCloudSolrServer.setParallelUpdates(random().nextBoolean()); commondCloudSolrServer.setDefaultCollection(DEFAULT_COLLECTION); commondCloudSolrServer.getLbServer().setConnectionTimeout(15000); commondCloudSolrServer.getLbServer().setSoTimeout(30000); diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/BinaryRequestWriter.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/BinaryRequestWriter.java index 73e409c5815..5d2f81e0907 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/BinaryRequestWriter.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/BinaryRequestWriter.java @@ -41,7 +41,7 @@ public class BinaryRequestWriter extends RequestWriter { if (req instanceof UpdateRequest) { UpdateRequest updateRequest = (UpdateRequest) req; if (isNull(updateRequest.getDocuments()) && - isNull(updateRequest.getDeleteById()) && + isNull(updateRequest.getDeleteByIdMap()) && isNull(updateRequest.getDeleteQuery()) && (updateRequest.getDocIterator() == null) ) { return null; diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrServer.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrServer.java index 08a446bf33a..b4e41006eb3 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrServer.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrServer.java @@ -30,18 +30,28 @@ import java.util.List; import java.util.Map; import java.util.Random; import java.util.Set; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.TimeoutException; import org.apache.http.client.HttpClient; +import org.apache.solr.client.solrj.ResponseParser; import org.apache.solr.client.solrj.SolrRequest; import org.apache.solr.client.solrj.SolrServer; import org.apache.solr.client.solrj.SolrServerException; +import org.apache.solr.client.solrj.request.AbstractUpdateRequest; import org.apache.solr.client.solrj.request.IsUpdateRequest; +import org.apache.solr.client.solrj.request.UpdateRequest; import org.apache.solr.client.solrj.util.ClientUtils; import org.apache.solr.common.SolrException; import org.apache.solr.common.SolrException.ErrorCode; import org.apache.solr.common.cloud.Aliases; import org.apache.solr.common.cloud.ClusterState; +import org.apache.solr.common.cloud.DocCollection; +import org.apache.solr.common.cloud.DocRouter; +import org.apache.solr.common.cloud.ImplicitDocRouter; +import org.apache.solr.common.cloud.Replica; import org.apache.solr.common.cloud.Slice; import org.apache.solr.common.cloud.ZkCoreNodeProps; import org.apache.solr.common.cloud.ZkNodeProps; @@ -49,7 +59,9 @@ import org.apache.solr.common.cloud.ZkStateReader; import org.apache.solr.common.cloud.ZooKeeperException; import org.apache.solr.common.params.ModifiableSolrParams; import org.apache.solr.common.params.SolrParams; +import org.apache.solr.common.params.UpdateParams; import org.apache.solr.common.util.NamedList; +import org.apache.solr.common.util.SolrjNamedThreadFactory; import org.apache.solr.common.util.StrUtils; import org.apache.zookeeper.KeeperException; @@ -58,6 +70,10 @@ import org.apache.zookeeper.KeeperException; * Instances of this class communicate with Zookeeper to discover * Solr endpoints for SolrCloud collections, and then use the * {@link LBHttpSolrServer} to issue requests. + * + * This class assumes the id field for your documents is called + * 'id' - if this is not the case, you must set the right name + * with {@link #setIdField(String)}. */ public class CloudSolrServer extends SolrServer { private volatile ZkStateReader zkStateReader; @@ -65,7 +81,7 @@ public class CloudSolrServer extends SolrServer { private int zkConnectTimeout = 10000; private int zkClientTimeout = 10000; private volatile String defaultCollection; - private LBHttpSolrServer lbServer; + private final LBHttpSolrServer lbServer; private HttpClient myClient; Random rand = new Random(); @@ -79,8 +95,31 @@ public class CloudSolrServer extends SolrServer { private volatile int lastClusterStateHashCode; private final boolean updatesToLeaders; + private boolean parallelUpdates = true; + private ExecutorService threadPool = Executors + .newCachedThreadPool(new SolrjNamedThreadFactory( + "CloudSolrServer ThreadPool")); + private String idField = "id"; + private final Set NON_ROUTABLE_PARAMS; + { + NON_ROUTABLE_PARAMS = new HashSet(); + NON_ROUTABLE_PARAMS.add(UpdateParams.EXPUNGE_DELETES); + NON_ROUTABLE_PARAMS.add(UpdateParams.MAX_OPTIMIZE_SEGMENTS); + NON_ROUTABLE_PARAMS.add(UpdateParams.COMMIT); + NON_ROUTABLE_PARAMS.add(UpdateParams.WAIT_SEARCHER); + NON_ROUTABLE_PARAMS.add(UpdateParams.OPEN_SEARCHER); + + NON_ROUTABLE_PARAMS.add(UpdateParams.SOFT_COMMIT); + NON_ROUTABLE_PARAMS.add(UpdateParams.PREPARE_COMMIT); + NON_ROUTABLE_PARAMS.add(UpdateParams.OPTIMIZE); + + // Not supported via SolrCloud + // NON_ROUTABLE_PARAMS.add(UpdateParams.ROLLBACK); + + } + + - /** * @param zkHost The client endpoint of the zookeeper quorum containing the cloud state, * in the form HOST:PORT. @@ -92,12 +131,13 @@ public class CloudSolrServer extends SolrServer { this.updatesToLeaders = true; } - public CloudSolrServer(String zkHost, boolean updatesToLeaders) throws MalformedURLException { + public CloudSolrServer(String zkHost, boolean updatesToLeaders) + throws MalformedURLException { this.zkHost = zkHost; this.myClient = HttpClientUtil.createClient(null); this.lbServer = new LBHttpSolrServer(myClient); this.updatesToLeaders = updatesToLeaders; -} + } /** * @param zkHost The client endpoint of the zookeeper quorum containing the cloud state, @@ -121,10 +161,40 @@ public class CloudSolrServer extends SolrServer { this.lbServer = lbServer; this.updatesToLeaders = updatesToLeaders; } + + public ResponseParser getParser() { + return lbServer.getParser(); + } + + /** + * Note: This setter method is not thread-safe. + * + * @param processor + * Default Response Parser chosen to parse the response if the parser + * were not specified as part of the request. + * @see org.apache.solr.client.solrj.SolrRequest#getResponseParser() + */ + public void setParser(ResponseParser processor) { + lbServer.setParser(processor); + } public ZkStateReader getZkStateReader() { return zkStateReader; } + + /** + * @param idField the field to route documents on. + */ + public void setIdField(String idField) { + this.idField = idField; + } + + /** + * @return the field that updates are routed on. + */ + public String getIdField() { + return idField; + } /** Sets the default collection for request */ public void setDefaultCollection(String collection) { @@ -179,18 +249,293 @@ public class CloudSolrServer extends SolrServer { } } + public void setParallelUpdates(boolean parallelUpdates) { + this.parallelUpdates = parallelUpdates; + } + + private NamedList directUpdate(AbstractUpdateRequest request, ClusterState clusterState) throws SolrServerException { + UpdateRequest updateRequest = (UpdateRequest) request; + ModifiableSolrParams params = (ModifiableSolrParams) request.getParams(); + ModifiableSolrParams routableParams = new ModifiableSolrParams(); + ModifiableSolrParams nonRoutableParams = new ModifiableSolrParams(); + + if(params != null) { + nonRoutableParams.add(params); + routableParams.add(params); + for(String param : NON_ROUTABLE_PARAMS) { + routableParams.remove(param); + } + } + if (params == null) { + return null; + } + + String collection = params.get("collection", defaultCollection); + if (collection == null) { + throw new SolrServerException("No collection param specified on request and no default collection has been set."); + } + + + //Check to see if the collection is an alias. + Aliases aliases = zkStateReader.getAliases(); + if(aliases != null) { + Map collectionAliases = aliases.getCollectionAliasMap(); + if(collectionAliases != null && collectionAliases.containsKey(collection)) { + collection = collectionAliases.get(collection); + } + } + + DocCollection col = clusterState.getCollection(collection); + + DocRouter router = col.getRouter(); + + if (router instanceof ImplicitDocRouter) { + // short circuit as optimization + return null; + } + + //Create the URL map, which is keyed on slice name. + //The value is a list of URLs for each replica in the slice. + //The first value in the list is the leader for the slice. + Map> urlMap = buildUrlMap(col); + + NamedList exceptions = new NamedList(); + NamedList shardResponses = new NamedList(); + + Map routes = updateRequest.getRoutes(router, col, urlMap, routableParams, this.idField); + if (routes == null) { + return null; + } + + Iterator> it = routes.entrySet().iterator(); + + long start = System.nanoTime(); + if(this.parallelUpdates) { + ArrayBlockingQueue finishedTasks = new ArrayBlockingQueue(routes.size()); + + while (it.hasNext()) { + Map.Entry entry = it.next(); + String url = entry.getKey(); + LBHttpSolrServer.Req lbRequest = entry.getValue(); + threadPool.execute(new RequestTask(url, lbRequest, finishedTasks)); + } + + while ((shardResponses.size() + exceptions.size()) != routes.size()) { + RequestTask requestTask = null; + try { + requestTask = finishedTasks.take(); + } catch (Exception e) { + throw new SolrException(ErrorCode.SERVER_ERROR, e); + } + + Exception e = requestTask.getException(); + if (e != null) { + exceptions.add(requestTask.getLeader(), e); + } else { + shardResponses.add(requestTask.getLeader(), requestTask.getRsp().getResponse()); + } + } + + if(exceptions.size() > 0) { + throw new RouteException(ErrorCode.SERVER_ERROR, exceptions, routes); + } + } else { + while (it.hasNext()) { + Map.Entry entry = it.next(); + String url = entry.getKey(); + LBHttpSolrServer.Req lbRequest = entry.getValue(); + try{ + NamedList rsp = lbServer.request(lbRequest).getResponse(); + shardResponses.add(url, rsp); + } catch(Exception e) { + throw new SolrServerException(e); + } + } + } + + UpdateRequest nonRoutableRequest = null; + List deleteQuery = updateRequest.getDeleteQuery(); + if (deleteQuery != null && deleteQuery.size() > 0) { + UpdateRequest deleteQueryRequest = new UpdateRequest(); + deleteQueryRequest.setDeleteQuery(deleteQuery); + nonRoutableRequest = deleteQueryRequest; + } + + Set paramNames = nonRoutableParams.getParameterNames(); + + Set intersection = new HashSet(paramNames); + intersection.retainAll(NON_ROUTABLE_PARAMS); + + if (nonRoutableRequest != null || intersection.size() > 0) { + if (nonRoutableRequest == null) { + nonRoutableRequest = new UpdateRequest(); + } + nonRoutableRequest.setParams(nonRoutableParams); + List urlList = new ArrayList(); + urlList.addAll(routes.keySet()); + Collections.shuffle(urlList, rand); + LBHttpSolrServer.Req req = new LBHttpSolrServer.Req(nonRoutableRequest, urlList); + try { + LBHttpSolrServer.Rsp rsp = lbServer.request(req); + shardResponses.add(urlList.get(0), rsp.getResponse()); + } catch (Exception e) { + throw new SolrException(ErrorCode.SERVER_ERROR, urlList.get(0), e); + } + } + + long end = System.nanoTime(); + + RouteResponse rr = condenseResponse(shardResponses, (long)((end - start)/1000000)); + rr.setRouteResponses(shardResponses); + rr.setRoutes(routes); + return rr; + } + + private Map> buildUrlMap(DocCollection col) { + Map> urlMap = new HashMap>(); + Collection slices = col.getActiveSlices(); + Iterator sliceIterator = slices.iterator(); + while (sliceIterator.hasNext()) { + Slice slice = sliceIterator.next(); + String name = slice.getName(); + List urls = new ArrayList(); + Replica leader = slice.getLeader(); + ZkCoreNodeProps zkProps = new ZkCoreNodeProps(leader); + String url = zkProps.getBaseUrl() + "/" + col.getName(); + urls.add(url); + Collection replicas = slice.getReplicas(); + Iterator replicaIterator = replicas.iterator(); + while (replicaIterator.hasNext()) { + Replica replica = replicaIterator.next(); + if (!replica.getNodeName().equals(leader.getNodeName()) && + !replica.getName().equals(leader.getName())) { + ZkCoreNodeProps zkProps1 = new ZkCoreNodeProps(replica); + String url1 = zkProps1.getBaseUrl() + "/" + col.getName(); + urls.add(url1); + } + } + urlMap.put(name, urls); + } + return urlMap; + } + + public RouteResponse condenseResponse(NamedList response, long timeMillis) { + RouteResponse condensed = new RouteResponse(); + int status = 0; + for(int i=0; i 0) { + status = s; + } + } + + NamedList cheader = new NamedList(); + cheader.add("status", status); + cheader.add("QTime", timeMillis); + condensed.add("responseHeader", cheader); + return condensed; + } + + class RequestTask implements Runnable { + + private LBHttpSolrServer.Req req; + private ArrayBlockingQueue tasks; + private LBHttpSolrServer.Rsp rsp; + private String leader; + private Exception e; + + public RequestTask(String leader, LBHttpSolrServer.Req req, ArrayBlockingQueue tasks) { + this.req = req; + this.tasks = tasks; + this.leader = leader; + } + + public void run() { + try { + LBHttpSolrServer lb = new LBHttpSolrServer(myClient); + this.rsp = lb.request(req); + this.tasks.add(this); + } catch (Exception e) { + this.e = e; + this.tasks.add(this); + } + } + + public Exception getException() { + return e; + } + + public String getLeader() { + return this.leader; + } + + public LBHttpSolrServer.Rsp getRsp() { + return rsp; + } + } + + class RouteResponse extends NamedList { + private NamedList routeResponses; + private Map routes; + + public void setRouteResponses(NamedList routeResponses) { + this.routeResponses = routeResponses; + } + + public NamedList getRouteResponses() { + return routeResponses; + } + + public void setRoutes(Map routes) { + this.routes = routes; + } + + public Map getRoutes() { + return routes; + } + + } + + class RouteException extends SolrException { + + private NamedList exceptions; + private Map routes; + + public RouteException(ErrorCode errorCode, NamedList exceptions, Map routes){ + super(errorCode, ((Exception)exceptions.getVal(0)).getMessage(), (Exception)exceptions.getVal(0)); + this.exceptions = exceptions; + this.routes = routes; + } + + public NamedList getExceptions() { + return exceptions; + } + + public Map getRoutes() { + return this.routes; + } + } + @Override public NamedList request(SolrRequest request) throws SolrServerException, IOException { connect(); - // TODO: if you can hash here, you could favor the shard leader - ClusterState clusterState = zkStateReader.getClusterState(); + boolean sendToLeaders = false; List replicas = null; - if (request instanceof IsUpdateRequest && updatesToLeaders) { + if (request instanceof IsUpdateRequest) { + if(request instanceof UpdateRequest) { + NamedList response = directUpdate((AbstractUpdateRequest)request,clusterState); + if(response != null) { + return response; + } + } sendToLeaders = true; replicas = new ArrayList(); } @@ -355,6 +700,10 @@ public class CloudSolrServer extends SolrServer { if (myClient!=null) { myClient.getConnectionManager().shutdown(); } + + if(this.threadPool != null && !this.threadPool.isShutdown()) { + this.threadPool.shutdown(); + } } public LBHttpSolrServer getLbServer() { diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpSolrServer.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpSolrServer.java index da4e44832b5..afcfe3ed383 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpSolrServer.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpSolrServer.java @@ -98,14 +98,14 @@ public class HttpSolrServer extends SolrServer { * * @see org.apache.solr.client.solrj.impl.BinaryResponseParser */ - protected ResponseParser parser; + protected volatile ResponseParser parser; /** * The RequestWriter used to write all requests to Solr * * @see org.apache.solr.client.solrj.request.RequestWriter */ - protected RequestWriter requestWriter = new RequestWriter(); + protected volatile RequestWriter requestWriter = new RequestWriter(); private final HttpClient httpClient; diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/LBHttpSolrServer.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/LBHttpSolrServer.java index 70c45463364..07a2257cf00 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/LBHttpSolrServer.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/LBHttpSolrServer.java @@ -18,6 +18,7 @@ package org.apache.solr.client.solrj.impl; import org.apache.http.client.HttpClient; import org.apache.solr.client.solrj.*; +import org.apache.solr.client.solrj.request.RequestWriter; import org.apache.solr.client.solrj.response.QueryResponse; import org.apache.solr.common.params.ModifiableSolrParams; import org.apache.solr.common.util.NamedList; @@ -93,7 +94,8 @@ public class LBHttpSolrServer extends SolrServer { private final AtomicInteger counter = new AtomicInteger(-1); private static final SolrQuery solrQuery = new SolrQuery("*:*"); - private final ResponseParser parser; + private volatile ResponseParser parser; + private volatile RequestWriter requestWriter; static { solrQuery.setRows(0); @@ -219,11 +221,13 @@ public class LBHttpSolrServer extends SolrServer { } protected HttpSolrServer makeServer(String server) throws MalformedURLException { - return new HttpSolrServer(server, httpClient, parser); + HttpSolrServer s = new HttpSolrServer(server, httpClient, parser); + if (requestWriter != null) { + s.setRequestWriter(requestWriter); + } + return s; } - - /** * Tries to query a live server from the list provided in Req. Servers in the dead pool are skipped. * If a request fails due to an IOException, the server is moved to the dead pool for a certain period of @@ -590,6 +594,18 @@ public class LBHttpSolrServer extends SolrServer { return httpClient; } + public ResponseParser getParser() { + return parser; + } + + public void setParser(ResponseParser parser) { + this.parser = parser; + } + + public void setRequestWriter(RequestWriter requestWriter) { + this.requestWriter = requestWriter; + } + @Override protected void finalize() throws Throwable { try { @@ -603,4 +619,5 @@ public class LBHttpSolrServer extends SolrServer { // defaults private static final int CHECK_INTERVAL = 60 * 1000; //1 minute between checks private static final int NONSTANDARD_PING_LIMIT = 5; // number of times we'll ping dead servers not in the server list + } diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/request/JavaBinUpdateRequestCodec.java b/solr/solrj/src/java/org/apache/solr/client/solrj/request/JavaBinUpdateRequestCodec.java index e21bfa66d7f..1de906f3547 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/request/JavaBinUpdateRequestCodec.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/request/JavaBinUpdateRequestCodec.java @@ -23,6 +23,9 @@ import java.util.ArrayList; import java.util.Collections; import java.util.Iterator; import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; import org.apache.solr.common.SolrInputDocument; import org.apache.solr.common.params.ModifiableSolrParams; @@ -63,11 +66,14 @@ public class JavaBinUpdateRequestCodec { if(updateRequest.getDocIterator() != null){ docIter = updateRequest.getDocIterator(); } + + Map> docMap = updateRequest.getDocumentsMap(); nl.add("params", params);// 0: params - nl.add("delById", updateRequest.getDeleteById()); + nl.add("delByIdMap", updateRequest.getDeleteByIdMap()); nl.add("delByQ", updateRequest.getDeleteQuery()); nl.add("docs", docIter); + nl.add("docsMap", docMap); JavaBinCodec codec = new JavaBinCodec(); codec.marshal(nl, os); } @@ -86,7 +92,9 @@ public class JavaBinUpdateRequestCodec { public UpdateRequest unmarshal(InputStream is, final StreamingUpdateHandler handler) throws IOException { final UpdateRequest updateRequest = new UpdateRequest(); List> doclist; + Map> docMap; List delById; + Map delByIdMap; List delByQ; final NamedList[] namedList = new NamedList[1]; JavaBinCodec codec = new JavaBinCodec() { @@ -158,9 +166,11 @@ public class JavaBinUpdateRequestCodec { } } delById = (List) namedList[0].get("delById"); + delByIdMap = (Map) namedList[0].get("delByIdMap"); delByQ = (List) namedList[0].get("delByQ"); doclist = (List) namedList[0].get("docs"); - + docMap = (Map>) namedList[0].get("docsMap"); + if (doclist != null && !doclist.isEmpty()) { List solrInputDocs = new ArrayList(); for (Object o : doclist) { @@ -172,11 +182,29 @@ public class JavaBinUpdateRequestCodec { } updateRequest.add(solrInputDocs); } + if (docMap != null && !docMap.isEmpty()) { + Set>> entries = docMap.entrySet(); + for (Entry> entry : entries) { + Map map = entry.getValue(); + Boolean overwrite = null; + Integer commitWithin = null; + if (map != null) { + overwrite = (Boolean) map.get(UpdateRequest.OVERWRITE); + commitWithin = (Integer) map.get(UpdateRequest.COMMIT_WITHIN); + } + updateRequest.add(entry.getKey(), commitWithin, overwrite); + } + } if (delById != null) { for (String s : delById) { updateRequest.deleteById(s); } } + if (delByIdMap != null) { + for (Map.Entry entry : delByIdMap.entrySet()) { + updateRequest.deleteById(entry.getKey(), entry.getValue()); + } + } if (delByQ != null) { for (String s : delByQ) { updateRequest.deleteByQuery(s); diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/request/RequestWriter.java b/solr/solrj/src/java/org/apache/solr/client/solrj/request/RequestWriter.java index ea9ca23231a..8c2d90e3bd5 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/request/RequestWriter.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/request/RequestWriter.java @@ -26,6 +26,7 @@ import java.io.*; import java.util.ArrayList; import java.util.Collection; import java.util.List; +import java.util.Map; import java.nio.charset.Charset; /** @@ -52,7 +53,7 @@ public class RequestWriter { private boolean isEmpty(UpdateRequest updateRequest) { return isNull(updateRequest.getDocuments()) && - isNull(updateRequest.getDeleteById()) && + isNull(updateRequest.getDeleteByIdMap()) && isNull(updateRequest.getDeleteQuery()) && updateRequest.getDocIterator() == null; } @@ -137,4 +138,8 @@ public class RequestWriter { protected boolean isNull(List l) { return l == null || l.isEmpty(); } + + protected boolean isNull(Map l) { + return l == null || l.isEmpty(); + } } diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/request/UpdateRequest.java b/solr/solrj/src/java/org/apache/solr/client/solrj/request/UpdateRequest.java index 6432265c5b9..7898079127e 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/request/UpdateRequest.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/request/UpdateRequest.java @@ -22,203 +22,411 @@ import java.io.StringWriter; import java.io.Writer; import java.util.ArrayList; import java.util.Collection; -import java.util.List; +import java.util.HashMap; import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.LinkedHashMap; +import org.apache.solr.client.solrj.impl.LBHttpSolrServer; import org.apache.solr.client.solrj.util.ClientUtils; import org.apache.solr.common.SolrInputDocument; +import org.apache.solr.common.cloud.DocCollection; +import org.apache.solr.common.cloud.DocRouter; +import org.apache.solr.common.cloud.Slice; +import org.apache.solr.common.params.ModifiableSolrParams; import org.apache.solr.common.util.ContentStream; import org.apache.solr.common.util.XML; /** * - * + * * @since solr 1.3 */ public class UpdateRequest extends AbstractUpdateRequest { - private List documents = null; + public static final String OVERWRITE = "ow"; + public static final String COMMIT_WITHIN = "cw"; + private Map> documents = null; private Iterator docIterator = null; - private List deleteById = null; + private Map> deleteById = null; private List deleteQuery = null; - - public UpdateRequest() - { - super( METHOD.POST, "/update" ); + + public UpdateRequest() { + super(METHOD.POST, "/update"); } - + public UpdateRequest(String url) { super(METHOD.POST, url); } - - //--------------------------------------------------------------------------- - //--------------------------------------------------------------------------- + + // --------------------------------------------------------------------------- + // --------------------------------------------------------------------------- /** * clear the pending documents and delete commands */ - public void clear() - { - if( documents != null ) { + public void clear() { + if (documents != null) { documents.clear(); } - if( deleteById != null ) { + if (deleteById != null) { deleteById.clear(); } - if( deleteQuery != null ) { + if (deleteQuery != null) { deleteQuery.clear(); } } - //--------------------------------------------------------------------------- - //--------------------------------------------------------------------------- + // --------------------------------------------------------------------------- + // --------------------------------------------------------------------------- - public UpdateRequest add( final SolrInputDocument doc ) - { - if( documents == null ) { - documents = new ArrayList( 2 ); + public UpdateRequest add(final SolrInputDocument doc) { + if (documents == null) { + documents = new LinkedHashMap>(); } - documents.add( doc ); + documents.put(doc, null); return this; } - public UpdateRequest add( final Collection docs ) - { - if( documents == null ) { - documents = new ArrayList( docs.size()+1 ); + public UpdateRequest add(final SolrInputDocument doc, Boolean overwrite) { + return add(doc, null, overwrite); + } + + public UpdateRequest add(final SolrInputDocument doc, Integer commitWithin) { + return add(doc, commitWithin, null); + } + + public UpdateRequest add(final SolrInputDocument doc, Integer commitWithin, + Boolean overwrite) { + if (documents == null) { + documents = new LinkedHashMap>(); } - documents.addAll( docs ); + Map params = new HashMap(2); + if (commitWithin != null) params.put(COMMIT_WITHIN, commitWithin); + if (overwrite != null) params.put(OVERWRITE, overwrite); + + documents.put(doc, params); + return this; } - public UpdateRequest deleteById( String id ) - { - if( deleteById == null ) { - deleteById = new ArrayList(); + public UpdateRequest add(final Collection docs) { + if (documents == null) { + documents = new LinkedHashMap>(); } - deleteById.add( id ); - return this; - } - public UpdateRequest deleteById( List ids ) - { - if( deleteById == null ) { - deleteById = new ArrayList(ids); - } else { - deleteById.addAll(ids); + for (SolrInputDocument doc : docs) { + documents.put(doc, null); } return this; } - public UpdateRequest deleteByQuery( String q ) - { - if( deleteQuery == null ) { + public UpdateRequest deleteById(String id) { + if (deleteById == null) { + deleteById = new LinkedHashMap>(); + } + deleteById.put(id, null); + return this; + } + + public UpdateRequest deleteById(List ids) { + if (deleteById == null) { + deleteById = new LinkedHashMap>(); + } + + for (String id : ids) { + deleteById.put(id, null); + } + + return this; + } + + public UpdateRequest deleteById(String id, Long version) { + if (deleteById == null) { + deleteById = new LinkedHashMap>(); + } + Map params = new HashMap(1); + params.put("ver", version); + deleteById.put(id, params); + return this; + } + + public UpdateRequest deleteByQuery(String q) { + if (deleteQuery == null) { deleteQuery = new ArrayList(); } - deleteQuery.add( q ); + deleteQuery.add(q); return this; } + + /** + * @param router to route updates with + * @param col DocCollection for the updates + * @param urlMap of the cluster + * @param params params to use + * @param idField the id field + * @return a Map of urls to requests + */ + public Map getRoutes(DocRouter router, + DocCollection col, Map> urlMap, + ModifiableSolrParams params, String idField) { + + if ((documents == null || documents.size() == 0) + && (deleteById == null || deleteById.size() == 0)) { + return null; + } + + Map routes = new HashMap(); + if (documents != null) { + Set>> entries = documents.entrySet(); + for (Entry> entry : entries) { + SolrInputDocument doc = entry.getKey(); + Object id = doc.getFieldValue(idField); + if (id == null) { + return null; + } + Slice slice = router.getTargetSlice(id + .toString(), doc, null, col); + if (slice == null) { + return null; + } + List urls = urlMap.get(slice.getName()); + String leaderUrl = urls.get(0); + LBHttpSolrServer.Req request = (LBHttpSolrServer.Req) routes + .get(leaderUrl); + if (request == null) { + UpdateRequest updateRequest = new UpdateRequest(); + updateRequest.setMethod(getMethod()); + updateRequest.setCommitWithin(getCommitWithin()); + updateRequest.setParams(params); + updateRequest.setPath(getPath()); + request = new LBHttpSolrServer.Req(updateRequest, urls); + routes.put(leaderUrl, request); + } + UpdateRequest urequest = (UpdateRequest) request.getRequest(); + urequest.add(doc); + } + } + + // Route the deleteById's + + if (deleteById != null) { + + Iterator>> entries = deleteById.entrySet() + .iterator(); + while (entries.hasNext()) { + + Map.Entry> entry = entries.next(); + + String deleteId = entry.getKey(); + Map map = entry.getValue(); + Long version = null; + if (map != null) { + version = (Long) map.get("ver"); + } + Slice slice = router.getTargetSlice(deleteId, null, null, col); + if (slice == null) { + return null; + } + List urls = urlMap.get(slice.getName()); + String leaderUrl = urls.get(0); + LBHttpSolrServer.Req request = routes.get(leaderUrl); + if (request != null) { + UpdateRequest urequest = (UpdateRequest) request.getRequest(); + urequest.deleteById(deleteId, version); + } else { + UpdateRequest urequest = new UpdateRequest(); + urequest.deleteById(deleteId, version); + request = new LBHttpSolrServer.Req(urequest, urls); + routes.put(leaderUrl, request); + } + } + } + return routes; + } + public void setDocIterator(Iterator docIterator) { this.docIterator = docIterator; } - - //-------------------------------------------------------------------------- - //-------------------------------------------------------------------------- - + + public void setDeleteQuery(List deleteQuery) { + this.deleteQuery = deleteQuery; + } + + // -------------------------------------------------------------------------- + // -------------------------------------------------------------------------- + @Override public Collection getContentStreams() throws IOException { - return ClientUtils.toContentStreams( getXML(), ClientUtils.TEXT_XML ); + return ClientUtils.toContentStreams(getXML(), ClientUtils.TEXT_XML); } - + public String getXML() throws IOException { StringWriter writer = new StringWriter(); - writeXML( writer ); + writeXML(writer); writer.flush(); - + // If action is COMMIT or OPTIMIZE, it is sent with params String xml = writer.toString(); - //System.out.println( "SEND:"+xml ); + // System.out.println( "SEND:"+xml ); return (xml.length() > 0) ? xml : null; } + private List>> getDocLists(Map> documents) { + List>> docLists = new ArrayList>>(); + Map> docList = null; + if (this.documents != null) { + + Boolean lastOverwrite = true; + Integer lastCommitWithin = -1; + + Set>> entries = this.documents + .entrySet(); + for (Entry> entry : entries) { + Map map = entry.getValue(); + Boolean overwrite = null; + Integer commitWithin = null; + if (map != null) { + overwrite = (Boolean) entry.getValue().get(OVERWRITE); + commitWithin = (Integer) entry.getValue().get(COMMIT_WITHIN); + } + if (overwrite != lastOverwrite || commitWithin != lastCommitWithin + || docLists.size() == 0) { + docList = new LinkedHashMap>(); + docLists.add(docList); + } + docList.put(entry.getKey(), entry.getValue()); + lastCommitWithin = commitWithin; + lastOverwrite = overwrite; + } + } + + if (docIterator != null) { + docList = new LinkedHashMap>(); + docLists.add(docList); + while (docIterator.hasNext()) { + SolrInputDocument doc = docIterator.next(); + if (doc != null) { + docList.put(doc, null); + } + } + + } + + return docLists; + } + /** * @since solr 1.4 */ - public void writeXML( Writer writer ) throws IOException { - if( (documents != null && documents.size() > 0) || docIterator != null) { - if( commitWithin > 0 ) { - writer.write(""); - } - else { - writer.write(""); - } - if(documents != null) { - for (SolrInputDocument doc : documents) { - if (doc != null) { - ClientUtils.writeXML(doc, writer); - } + public void writeXML(Writer writer) throws IOException { + List>> getDocLists = getDocLists(documents); + + for (Map> docs : getDocLists) { + + if ((docs != null && docs.size() > 0)) { + Entry> firstDoc = docs.entrySet() + .iterator().next(); + Map map = firstDoc.getValue(); + Integer cw = null; + Boolean ow = null; + if (map != null) { + cw = (Integer) firstDoc.getValue().get(COMMIT_WITHIN); + ow = (Boolean) firstDoc.getValue().get(OVERWRITE); } - } - if (docIterator != null) { - while (docIterator.hasNext()) { - SolrInputDocument doc = docIterator.next(); - if (doc != null) { - ClientUtils.writeXML(doc, writer); - } + if (ow == null) ow = true; + int commitWithin = (cw != null && cw != -1) ? cw : this.commitWithin; + boolean overwrite = ow; + if (commitWithin > -1 || overwrite != true) { + writer.write(""); + } else { + writer.write(""); } + + Set>> entries = docs + .entrySet(); + for (Entry> entry : entries) { + ClientUtils.writeXML(entry.getKey(), writer); + } + + writer.write(""); } - writer.write(""); } // Add the delete commands boolean deleteI = deleteById != null && deleteById.size() > 0; boolean deleteQ = deleteQuery != null && deleteQuery.size() > 0; - if( deleteI || deleteQ ) { - if(commitWithin>0) { - writer.append( "" ); + if (deleteI || deleteQ) { + if (commitWithin > 0) { + writer.append(""); } else { - writer.append( "" ); + writer.append(""); } - if( deleteI ) { - for( String id : deleteById ) { - writer.append( "" ); - XML.escapeCharData( id, writer ); - writer.append( "" ); + if (deleteI) { + for (Map.Entry> entry : deleteById.entrySet()) { + writer.append(" map = entry.getValue(); + if (map != null) { + Long version = (Long) map.get("ver"); + if (version != null) { + writer.append(" version=\"" + version + "\""); + } + } + writer.append(">"); + + XML.escapeCharData(entry.getKey(), writer); + writer.append(""); } } - if( deleteQ ) { - for( String q : deleteQuery ) { - writer.append( "" ); - XML.escapeCharData( q, writer ); - writer.append( "" ); + if (deleteQ) { + for (String q : deleteQuery) { + writer.append(""); + XML.escapeCharData(q, writer); + writer.append(""); } } - writer.append( "" ); + writer.append(""); } } - - - //-------------------------------------------------------------------------- - //-------------------------------------------------------------------------- - - //-------------------------------------------------------------------------- - // - //-------------------------------------------------------------------------- - + + // -------------------------------------------------------------------------- + // -------------------------------------------------------------------------- + + // -------------------------------------------------------------------------- + // + // -------------------------------------------------------------------------- + public List getDocuments() { + if (documents == null) return null; + List docs = new ArrayList(documents.size()); + docs.addAll(documents.keySet()); + return docs; + } + + public Map> getDocumentsMap() { return documents; } - + public Iterator getDocIterator() { return docIterator; } - + public List getDeleteById() { + if (deleteById == null) return null; + List deletes = new ArrayList(deleteById.keySet()); + return deletes; + } + + public Map> getDeleteByIdMap() { return deleteById; } - + public List getDeleteQuery() { return deleteQuery; } - + } diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/request/UpdateRequestExt.java b/solr/solrj/src/java/org/apache/solr/client/solrj/request/UpdateRequestExt.java deleted file mode 100644 index 4908a25be10..00000000000 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/request/UpdateRequestExt.java +++ /dev/null @@ -1,252 +0,0 @@ -package org.apache.solr.client.solrj.request; - -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -import java.io.IOException; -import java.io.StringWriter; -import java.io.Writer; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import org.apache.solr.client.solrj.util.ClientUtils; -import org.apache.solr.common.SolrInputDocument; -import org.apache.solr.common.util.ContentStream; -import org.apache.solr.common.util.XML; - -// TODO: bake this into UpdateRequest -public class UpdateRequestExt extends AbstractUpdateRequest { - - private List documents = null; - private Map deleteById = null; - private List deleteQuery = null; - - private class SolrDoc { - @Override - public String toString() { - return "SolrDoc [document=" + document + ", commitWithin=" + commitWithin - + ", overwrite=" + overwrite + "]"; - } - SolrInputDocument document; - int commitWithin; - boolean overwrite; - } - - public UpdateRequestExt() { - super(METHOD.POST, "/update"); - } - - public UpdateRequestExt(String url) { - super(METHOD.POST, url); - } - - // --------------------------------------------------------------------------- - // --------------------------------------------------------------------------- - - /** - * clear the pending documents and delete commands - */ - public void clear() { - if (documents != null) { - documents.clear(); - } - if (deleteById != null) { - deleteById.clear(); - } - if (deleteQuery != null) { - deleteQuery.clear(); - } - } - - // --------------------------------------------------------------------------- - // --------------------------------------------------------------------------- - - public UpdateRequestExt add(final SolrInputDocument doc) { - if (documents == null) { - documents = new ArrayList(2); - } - SolrDoc solrDoc = new SolrDoc(); - solrDoc.document = doc; - solrDoc.commitWithin = -1; - solrDoc.overwrite = true; - documents.add(solrDoc); - - return this; - } - - public UpdateRequestExt add(final SolrInputDocument doc, int commitWithin, - boolean overwrite) { - if (documents == null) { - documents = new ArrayList(2); - } - SolrDoc solrDoc = new SolrDoc(); - solrDoc.document = doc; - solrDoc.commitWithin = commitWithin; - solrDoc.overwrite = overwrite; - documents.add(solrDoc); - - return this; - } - - public UpdateRequestExt deleteById(String id) { - if (deleteById == null) { - deleteById = new HashMap(); - } - deleteById.put(id, null); - return this; - } - - public UpdateRequestExt deleteById(String id, Long version) { - if (deleteById == null) { - deleteById = new HashMap(); - } - deleteById.put(id, version); - return this; - } - - public UpdateRequestExt deleteById(List ids) { - if (deleteById == null) { - deleteById = new HashMap(); - } else { - for (String id : ids) { - deleteById.put(id, null); - } - } - return this; - } - - public UpdateRequestExt deleteByQuery(String q) { - if (deleteQuery == null) { - deleteQuery = new ArrayList(); - } - deleteQuery.add(q); - return this; - } - - // -------------------------------------------------------------------------- - // -------------------------------------------------------------------------- - - @Override - public Collection getContentStreams() throws IOException { - return ClientUtils.toContentStreams(getXML(), ClientUtils.TEXT_XML); - } - - public String getXML() throws IOException { - StringWriter writer = new StringWriter(); - writeXML(writer); - writer.flush(); - - String xml = writer.toString(); - - return (xml.length() > 0) ? xml : null; - } - - public void writeXML(Writer writer) throws IOException { - List> getDocLists = getDocLists(documents); - - for (List docs : getDocLists) { - - if ((docs != null && docs.size() > 0)) { - SolrDoc firstDoc = docs.get(0); - int commitWithin = firstDoc.commitWithin != -1 ? firstDoc.commitWithin : this.commitWithin; - boolean overwrite = firstDoc.overwrite; - if (commitWithin > -1 || overwrite != true) { - writer.write(""); - } else { - writer.write(""); - } - if (documents != null) { - for (SolrDoc doc : documents) { - if (doc != null) { - ClientUtils.writeXML(doc.document, writer); - } - } - } - - writer.write(""); - } - } - - // Add the delete commands - boolean deleteI = deleteById != null && deleteById.size() > 0; - boolean deleteQ = deleteQuery != null && deleteQuery.size() > 0; - if (deleteI || deleteQ) { - writer.append(""); - if (deleteI) { - for (Map.Entry entry : deleteById.entrySet()) { - writer.append(""); - - XML.escapeCharData(entry.getKey(), writer); - writer.append(""); - } - } - if (deleteQ) { - for (String q : deleteQuery) { - writer.append(""); - XML.escapeCharData(q, writer); - writer.append(""); - } - } - writer.append(""); - } - } - - private List> getDocLists(List documents) { - List> docLists = new ArrayList>(); - if (this.documents == null) { - return docLists; - } - boolean lastOverwrite = true; - int lastCommitWithin = -1; - List docList = null; - for (SolrDoc doc : this.documents) { - if (doc.overwrite != lastOverwrite - || doc.commitWithin != lastCommitWithin || docLists.size() == 0) { - docList = new ArrayList(); - docLists.add(docList); - } - docList.add(doc); - lastCommitWithin = doc.commitWithin; - lastOverwrite = doc.overwrite; - } - - return docLists; - } - - public Map getDeleteById() { - return deleteById; - } - - public List getDeleteQuery() { - return deleteQuery; - } - - @Override - public String toString() { - return "UpdateRequestExt [documents=" + documents + ", deleteById=" - + deleteById + ", deleteQuery=" + deleteQuery + "]"; - } - -} diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrServerTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrServerTest.java index 3f99eaa5230..bfa311e17c6 100644 --- a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrServerTest.java +++ b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrServerTest.java @@ -19,13 +19,22 @@ package org.apache.solr.client.solrj.impl; import java.io.File; import java.util.HashSet; +import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.Set; import org.apache.lucene.util.LuceneTestCase.Slow; +import org.apache.solr.client.solrj.request.AbstractUpdateRequest; +import org.apache.solr.client.solrj.request.QueryRequest; +import org.apache.solr.client.solrj.request.UpdateRequest; +import org.apache.solr.client.solrj.response.QueryResponse; import org.apache.solr.cloud.AbstractFullDistribZkTestBase; import org.apache.solr.cloud.AbstractZkTestCase; +import org.apache.solr.common.SolrDocumentList; import org.apache.solr.common.SolrInputDocument; +import org.apache.solr.common.params.ModifiableSolrParams; +import org.apache.solr.common.util.NamedList; import org.apache.solr.util.ExternalPaths; import org.junit.After; import org.junit.AfterClass; @@ -100,6 +109,92 @@ public class CloudSolrServerTest extends AbstractFullDistribZkTestBase { del("*:*"); + commit(); + + SolrInputDocument doc1 = new SolrInputDocument(); + doc1.addField(id, "0"); + doc1.addField("a_t", "hello1"); + SolrInputDocument doc2 = new SolrInputDocument(); + doc2.addField(id, "2"); + doc2.addField("a_t", "hello2"); + + UpdateRequest request = new UpdateRequest(); + request.add(doc1); + request.add(doc2); + request.setAction(AbstractUpdateRequest.ACTION.COMMIT, false, false); + + // Test single threaded routed updates for UpdateRequest + NamedList response = cloudClient.request(request); + CloudSolrServer.RouteResponse rr = (CloudSolrServer.RouteResponse) response; + Map routes = rr.getRoutes(); + Iterator> it = routes.entrySet() + .iterator(); + while (it.hasNext()) { + Map.Entry entry = it.next(); + String url = entry.getKey(); + UpdateRequest updateRequest = (UpdateRequest) entry.getValue() + .getRequest(); + SolrInputDocument doc = updateRequest.getDocuments().get(0); + String id = doc.getField("id").getValue().toString(); + ModifiableSolrParams params = new ModifiableSolrParams(); + params.add("q", "id:" + id); + params.add("distrib", "false"); + QueryRequest queryRequest = new QueryRequest(params); + HttpSolrServer solrServer = new HttpSolrServer(url); + QueryResponse queryResponse = queryRequest.process(solrServer); + SolrDocumentList docList = queryResponse.getResults(); + assertTrue(docList.getNumFound() == 1); + } + + // Test the deleteById routing for UpdateRequest + + UpdateRequest delRequest = new UpdateRequest(); + delRequest.deleteById("0"); + delRequest.deleteById("2"); + delRequest.setAction(AbstractUpdateRequest.ACTION.COMMIT, false, false); + cloudClient.request(delRequest); + ModifiableSolrParams qParams = new ModifiableSolrParams(); + qParams.add("q", "*:*"); + QueryRequest qRequest = new QueryRequest(qParams); + QueryResponse qResponse = qRequest.process(cloudClient); + SolrDocumentList docs = qResponse.getResults(); + assertTrue(docs.getNumFound() == 0); + + // Test Multi-Threaded routed updates for UpdateRequest + + CloudSolrServer threadedClient = null; + try { + threadedClient = new CloudSolrServer(zkServer.getZkAddress()); + threadedClient.setParallelUpdates(true); + threadedClient.setDefaultCollection("collection1"); + response = threadedClient.request(request); + rr = (CloudSolrServer.RouteResponse) response; + routes = rr.getRoutes(); + it = routes.entrySet() + .iterator(); + while (it.hasNext()) { + Map.Entry entry = it.next(); + String url = entry.getKey(); + UpdateRequest updateRequest = (UpdateRequest) entry.getValue() + .getRequest(); + SolrInputDocument doc = updateRequest.getDocuments().get(0); + String id = doc.getField("id").getValue().toString(); + ModifiableSolrParams params = new ModifiableSolrParams(); + params.add("q", "id:" + id); + params.add("distrib", "false"); + QueryRequest queryRequest = new QueryRequest(params); + HttpSolrServer solrServer = new HttpSolrServer(url); + QueryResponse queryResponse = queryRequest.process(solrServer); + SolrDocumentList docList = queryResponse.getResults(); + assertTrue(docList.getNumFound() == 1); + } + } finally { + threadedClient.shutdown(); + } + + del("*:*"); + commit(); + indexr(id, 0, "a_t", "to come to the aid of their country."); CloudJettyRunner shard1Leader = shardToLeaderJetty.get("shard1"); diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/request/TestUpdateRequestCodec.java b/solr/solrj/src/test/org/apache/solr/client/solrj/request/TestUpdateRequestCodec.java index e3f9632e51a..f9b3d56dc80 100644 --- a/solr/solrj/src/test/org/apache/solr/client/solrj/request/TestUpdateRequestCodec.java +++ b/solr/solrj/src/test/org/apache/solr/client/solrj/request/TestUpdateRequestCodec.java @@ -16,24 +16,23 @@ */ package org.apache.solr.client.solrj.request; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Set; + import junit.framework.Assert; import org.apache.lucene.util.LuceneTestCase; import org.apache.solr.common.SolrInputDocument; import org.apache.solr.common.SolrInputField; -import org.apache.solr.common.util.FastInputStream; import org.junit.Test; -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.util.Set; -import java.util.HashSet; -import java.util.Collection; -import java.util.Iterator; -import java.util.List; -import java.util.ArrayList; - /** * Test for UpdateRequestCodec * @@ -93,7 +92,7 @@ public class TestUpdateRequestCodec extends LuceneTestCase { }; UpdateRequest updateUnmarshalled = codec.unmarshal(new ByteArrayInputStream(baos.toByteArray()) ,handler); - Assert.assertNull(updateUnmarshalled.getDocuments()); + for (SolrInputDocument document : docs) { updateUnmarshalled.add(document); } @@ -144,7 +143,7 @@ public class TestUpdateRequestCodec extends LuceneTestCase { }; UpdateRequest updateUnmarshalled = codec.unmarshal(new ByteArrayInputStream(baos.toByteArray()) ,handler); - Assert.assertNull(updateUnmarshalled.getDocuments()); + for (SolrInputDocument document : docs) { updateUnmarshalled.add(document); } diff --git a/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java b/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java index 3086a88c6af..7e6f2102a96 100644 --- a/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java +++ b/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java @@ -246,6 +246,7 @@ public abstract class AbstractFullDistribZkTestBase extends AbstractDistribZkTes protected CloudSolrServer createCloudClient(String defaultCollection) throws MalformedURLException { CloudSolrServer server = new CloudSolrServer(zkServer.getZkAddress(), random().nextBoolean()); + server.setParallelUpdates(random().nextBoolean()); if (defaultCollection != null) server.setDefaultCollection(defaultCollection); server.getLbServer().getHttpClient().getParams() .setParameter(CoreConnectionPNames.CONNECTION_TIMEOUT, 5000); @@ -1696,6 +1697,7 @@ public abstract class AbstractFullDistribZkTestBase extends AbstractDistribZkTes synchronized(this) { try { commondCloudSolrServer = new CloudSolrServer(zkServer.getZkAddress(), random().nextBoolean()); + commondCloudSolrServer.setParallelUpdates(random().nextBoolean()); commondCloudSolrServer.setDefaultCollection(DEFAULT_COLLECTION); commondCloudSolrServer.connect(); } catch (MalformedURLException e) {