SOLR-4816: CloudSolrServer can now route updates locally and no longer relies on inter-node update forwarding.

SOLR-3249: Allow CloudSolrServer and SolrCmdDistributor to use JavaBin.
SOLR-4816: CloudSolrServer now uses multiple threads to send updates by default.

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1521713 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Mark Robert Miller 2013-09-11 01:19:45 +00:00
parent 88a00de5b7
commit bd4f9b9896
18 changed files with 886 additions and 408 deletions

View File

@ -89,6 +89,15 @@ Upgrading from Solr 4.4.0
init param option is now deprecated and should be replaced with the more standard init param option is now deprecated and should be replaced with the more standard
<arr name="fieldName">. See SOLR-4249 for more details. <arr name="fieldName">. See SOLR-4249 for more details.
* UpdateRequestExt has been removed as part of SOLR-4816. You should use UpdateRequest
instead.
* CloudSolrServer can now use multiple threads to add documents by default. This is a
small change in runtime semantics when using the bulk add method - you will still
end up with the same exception on a failure, but some documents beyond the one that
failed may have made it in. To get the old, single threaded behavior, set parallel updates
to false on the CloudSolrServer instance.
Detailed Change List Detailed Change List
---------------------- ----------------------
@ -141,6 +150,10 @@ New Features
one extreme case this reduced warmup time from 20 seconds to 3 seconds. (Janne Majaranta, one extreme case this reduced warmup time from 20 seconds to 3 seconds. (Janne Majaranta,
Gun Akkor via Erick Erickson) Gun Akkor via Erick Erickson)
* SOLR-4816: CloudSolrServer can now route updates locally and no longer relies on inter-node
update forwarding. (Joel Bernstein, Mark Miller)
* SOLR-3249: Allow CloudSolrServer and SolrCmdDistributor to use JavaBin. (Mark Miller)
Bug Fixes Bug Fixes
---------------------- ----------------------
@ -223,6 +236,9 @@ Optimizations
* SOLR-5057: QueryResultCache should not related with the order of fq's list (Feihong Huang via Erick Erickson) * SOLR-5057: QueryResultCache should not related with the order of fq's list (Feihong Huang via Erick Erickson)
* SOLR-4816: CloudSolrServer now uses multiple threads to send updates by default.
(Joel Bernstein via Mark Miller)
Other Changes Other Changes
---------------------- ----------------------

View File

@ -37,6 +37,9 @@ import org.slf4j.LoggerFactory;
import java.io.EOFException; import java.io.EOFException;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
/** /**
* Update handler which uses the JavaBin format * Update handler which uses the JavaBin format
@ -97,7 +100,7 @@ public class JavabinLoader extends ContentStreamLoader {
} catch (EOFException e) { } catch (EOFException e) {
break; // this is expected break; // this is expected
} }
if (update.getDeleteById() != null || update.getDeleteQuery() != null) { if (update.getDeleteByIdMap() != null || update.getDeleteQuery() != null) {
delete(req, update, processor); delete(req, update, processor);
} }
} }
@ -118,9 +121,17 @@ public class JavabinLoader extends ContentStreamLoader {
delcmd.commitWithin = params.getInt(UpdateParams.COMMIT_WITHIN, -1); delcmd.commitWithin = params.getInt(UpdateParams.COMMIT_WITHIN, -1);
} }
if(update.getDeleteById() != null) { if(update.getDeleteByIdMap() != null) {
for (String s : update.getDeleteById()) { Set<Entry<String,Map<String,Object>>> entries = update.getDeleteByIdMap().entrySet();
delcmd.id = s; for (Entry<String,Map<String,Object>> e : entries) {
delcmd.id = e.getKey();
Map<String,Object> map = e.getValue();
if (map != null) {
Long version = (Long) map.get("ver");
if (version != null) {
delcmd.setVersion(version);
}
}
processor.processDelete(delcmd); processor.processDelete(delcmd);
delcmd.clear(); delcmd.clear();
} }

View File

@ -17,6 +17,10 @@
package org.apache.solr.update; package org.apache.solr.update;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.apache.lucene.document.Document; import org.apache.lucene.document.Document;
import org.apache.lucene.index.IndexDocument; import org.apache.lucene.index.IndexDocument;
import org.apache.lucene.index.Term; import org.apache.lucene.index.Term;
@ -28,11 +32,6 @@ import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.schema.IndexSchema; import org.apache.solr.schema.IndexSchema;
import org.apache.solr.schema.SchemaField; import org.apache.solr.schema.SchemaField;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
/** /**
* *
*/ */

View File

@ -34,7 +34,7 @@ import java.util.concurrent.RejectedExecutionException;
import org.apache.solr.client.solrj.SolrServerException; import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.impl.HttpSolrServer; import org.apache.solr.client.solrj.impl.HttpSolrServer;
import org.apache.solr.client.solrj.request.AbstractUpdateRequest; import org.apache.solr.client.solrj.request.AbstractUpdateRequest;
import org.apache.solr.client.solrj.request.UpdateRequestExt; import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.common.SolrException; import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode; import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.cloud.ZkCoreNodeProps; import org.apache.solr.common.cloud.ZkCoreNodeProps;
@ -152,8 +152,9 @@ public class SolrCmdDistributor {
// finish with the pending requests // finish with the pending requests
checkResponses(false); checkResponses(false);
UpdateRequestExt ureq = new UpdateRequestExt(); UpdateRequest ureq = new UpdateRequest();
ureq.add(cmd.solrDoc, cmd.commitWithin, cmd.overwrite); ureq.add(cmd.solrDoc, cmd.commitWithin, cmd.overwrite);
ureq.setParams(params); ureq.setParams(params);
syncRequest(node, ureq); syncRequest(node, ureq);
} }
@ -173,7 +174,7 @@ public class SolrCmdDistributor {
deleteRequest.cmd = clonedCmd; deleteRequest.cmd = clonedCmd;
deleteRequest.params = params; deleteRequest.params = params;
UpdateRequestExt ureq = new UpdateRequestExt(); UpdateRequest ureq = new UpdateRequest();
if (cmd.isDeleteById()) { if (cmd.isDeleteById()) {
ureq.deleteById(cmd.getId(), cmd.getVersion()); ureq.deleteById(cmd.getId(), cmd.getVersion());
} else { } else {
@ -185,7 +186,7 @@ public class SolrCmdDistributor {
} }
} }
private void syncRequest(Node node, UpdateRequestExt ureq) { private void syncRequest(Node node, UpdateRequest ureq) {
Request sreq = new Request(); Request sreq = new Request();
sreq.node = node; sreq.node = node;
sreq.ureq = ureq; sreq.ureq = ureq;
@ -224,7 +225,7 @@ public class SolrCmdDistributor {
// currently, we dont try to piggy back on outstanding adds or deletes // currently, we dont try to piggy back on outstanding adds or deletes
UpdateRequestExt ureq = new UpdateRequestExt(); UpdateRequest ureq = new UpdateRequest();
ureq.setParams(params); ureq.setParams(params);
addCommit(ureq, cmd); addCommit(ureq, cmd);
@ -265,7 +266,7 @@ public class SolrCmdDistributor {
flushDeletes(maxBufferedDeletesPerServer); flushDeletes(maxBufferedDeletesPerServer);
} }
void addCommit(UpdateRequestExt ureq, CommitUpdateCommand cmd) { void addCommit(UpdateRequest ureq, CommitUpdateCommand cmd) {
if (cmd == null) return; if (cmd == null) return;
ureq.setAction(cmd.optimize ? AbstractUpdateRequest.ACTION.OPTIMIZE ureq.setAction(cmd.optimize ? AbstractUpdateRequest.ACTION.OPTIMIZE
: AbstractUpdateRequest.ACTION.COMMIT, false, cmd.waitSearcher, cmd.maxOptimizeSegments, cmd.softCommit, cmd.expungeDeletes); : AbstractUpdateRequest.ACTION.COMMIT, false, cmd.waitSearcher, cmd.maxOptimizeSegments, cmd.softCommit, cmd.expungeDeletes);
@ -281,14 +282,13 @@ public class SolrCmdDistributor {
List<AddRequest> alist = adds.get(node); List<AddRequest> alist = adds.get(node);
if (alist == null || alist.size() < limit) continue; if (alist == null || alist.size() < limit) continue;
UpdateRequestExt ureq = new UpdateRequestExt(); UpdateRequest ureq = new UpdateRequest();
ModifiableSolrParams combinedParams = new ModifiableSolrParams(); ModifiableSolrParams combinedParams = new ModifiableSolrParams();
for (AddRequest aReq : alist) { for (AddRequest aReq : alist) {
AddUpdateCommand cmd = aReq.cmd; AddUpdateCommand cmd = aReq.cmd;
combinedParams.add(aReq.params); combinedParams.add(aReq.params);
ureq.add(cmd.solrDoc, cmd.commitWithin, cmd.overwrite); ureq.add(cmd.solrDoc, cmd.commitWithin, cmd.overwrite);
} }
@ -315,7 +315,7 @@ public class SolrCmdDistributor {
for (Node node : nodes) { for (Node node : nodes) {
List<DeleteRequest> dlist = deletes.get(node); List<DeleteRequest> dlist = deletes.get(node);
if (dlist == null || dlist.size() < limit) continue; if (dlist == null || dlist.size() < limit) continue;
UpdateRequestExt ureq = new UpdateRequestExt(); UpdateRequest ureq = new UpdateRequest();
ModifiableSolrParams combinedParams = new ModifiableSolrParams(); ModifiableSolrParams combinedParams = new ModifiableSolrParams();
@ -354,14 +354,14 @@ public class SolrCmdDistributor {
public static class Request { public static class Request {
public Node node; public Node node;
UpdateRequestExt ureq; UpdateRequest ureq;
NamedList<Object> ursp; NamedList<Object> ursp;
int rspCode; int rspCode;
public Exception exception; public Exception exception;
int retries; int retries;
} }
void submit(UpdateRequestExt ureq, Node node) { void submit(UpdateRequest ureq, Node node) {
Request sreq = new Request(); Request sreq = new Request();
sreq.node = node; sreq.node = node;
sreq.ureq = ureq; sreq.ureq = ureq;

View File

@ -17,7 +17,6 @@
package org.apache.solr.update; package org.apache.solr.update;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.request.SolrQueryRequest; import org.apache.solr.request.SolrQueryRequest;

View File

@ -153,6 +153,7 @@ public class AliasIntegrationTest extends AbstractFullDistribZkTestBase {
// search with new cloud client // search with new cloud client
CloudSolrServer cloudSolrServer = new CloudSolrServer(zkServer.getZkAddress(), random().nextBoolean()); CloudSolrServer cloudSolrServer = new CloudSolrServer(zkServer.getZkAddress(), random().nextBoolean());
cloudSolrServer.setParallelUpdates(random().nextBoolean());
query = new SolrQuery("*:*"); query = new SolrQuery("*:*");
query.set("collection", "testalias"); query.set("collection", "testalias");
res = cloudSolrServer.query(query); res = cloudSolrServer.query(query);

View File

@ -1104,6 +1104,7 @@ public class BasicDistributedZkTest extends AbstractFullDistribZkTestBase {
synchronized(this) { synchronized(this) {
try { try {
commondCloudSolrServer = new CloudSolrServer(zkServer.getZkAddress(), random().nextBoolean()); commondCloudSolrServer = new CloudSolrServer(zkServer.getZkAddress(), random().nextBoolean());
commondCloudSolrServer.setParallelUpdates(random().nextBoolean());
commondCloudSolrServer.setDefaultCollection(DEFAULT_COLLECTION); commondCloudSolrServer.setDefaultCollection(DEFAULT_COLLECTION);
commondCloudSolrServer.getLbServer().setConnectionTimeout(15000); commondCloudSolrServer.getLbServer().setConnectionTimeout(15000);
commondCloudSolrServer.getLbServer().setSoTimeout(30000); commondCloudSolrServer.getLbServer().setSoTimeout(30000);

View File

@ -41,7 +41,7 @@ public class BinaryRequestWriter extends RequestWriter {
if (req instanceof UpdateRequest) { if (req instanceof UpdateRequest) {
UpdateRequest updateRequest = (UpdateRequest) req; UpdateRequest updateRequest = (UpdateRequest) req;
if (isNull(updateRequest.getDocuments()) && if (isNull(updateRequest.getDocuments()) &&
isNull(updateRequest.getDeleteById()) && isNull(updateRequest.getDeleteByIdMap()) &&
isNull(updateRequest.getDeleteQuery()) isNull(updateRequest.getDeleteQuery())
&& (updateRequest.getDocIterator() == null) ) { && (updateRequest.getDocIterator() == null) ) {
return null; return null;

View File

@ -30,18 +30,28 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Random; import java.util.Random;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import org.apache.http.client.HttpClient; import org.apache.http.client.HttpClient;
import org.apache.solr.client.solrj.ResponseParser;
import org.apache.solr.client.solrj.SolrRequest; import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.SolrServer; import org.apache.solr.client.solrj.SolrServer;
import org.apache.solr.client.solrj.SolrServerException; import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.request.AbstractUpdateRequest;
import org.apache.solr.client.solrj.request.IsUpdateRequest; import org.apache.solr.client.solrj.request.IsUpdateRequest;
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.client.solrj.util.ClientUtils; import org.apache.solr.client.solrj.util.ClientUtils;
import org.apache.solr.common.SolrException; import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode; import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.cloud.Aliases; import org.apache.solr.common.cloud.Aliases;
import org.apache.solr.common.cloud.ClusterState; import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.DocRouter;
import org.apache.solr.common.cloud.ImplicitDocRouter;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice; import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.ZkCoreNodeProps; import org.apache.solr.common.cloud.ZkCoreNodeProps;
import org.apache.solr.common.cloud.ZkNodeProps; import org.apache.solr.common.cloud.ZkNodeProps;
@ -49,7 +59,9 @@ import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.cloud.ZooKeeperException; import org.apache.solr.common.cloud.ZooKeeperException;
import org.apache.solr.common.params.ModifiableSolrParams; import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.SolrParams; import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.params.UpdateParams;
import org.apache.solr.common.util.NamedList; import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.SolrjNamedThreadFactory;
import org.apache.solr.common.util.StrUtils; import org.apache.solr.common.util.StrUtils;
import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException;
@ -58,6 +70,10 @@ import org.apache.zookeeper.KeeperException;
* Instances of this class communicate with Zookeeper to discover * Instances of this class communicate with Zookeeper to discover
* Solr endpoints for SolrCloud collections, and then use the * Solr endpoints for SolrCloud collections, and then use the
* {@link LBHttpSolrServer} to issue requests. * {@link LBHttpSolrServer} to issue requests.
*
* This class assumes the id field for your documents is called
* 'id' - if this is not the case, you must set the right name
* with {@link #setIdField(String)}.
*/ */
public class CloudSolrServer extends SolrServer { public class CloudSolrServer extends SolrServer {
private volatile ZkStateReader zkStateReader; private volatile ZkStateReader zkStateReader;
@ -65,7 +81,7 @@ public class CloudSolrServer extends SolrServer {
private int zkConnectTimeout = 10000; private int zkConnectTimeout = 10000;
private int zkClientTimeout = 10000; private int zkClientTimeout = 10000;
private volatile String defaultCollection; private volatile String defaultCollection;
private LBHttpSolrServer lbServer; private final LBHttpSolrServer lbServer;
private HttpClient myClient; private HttpClient myClient;
Random rand = new Random(); Random rand = new Random();
@ -79,6 +95,29 @@ public class CloudSolrServer extends SolrServer {
private volatile int lastClusterStateHashCode; private volatile int lastClusterStateHashCode;
private final boolean updatesToLeaders; private final boolean updatesToLeaders;
private boolean parallelUpdates = true;
private ExecutorService threadPool = Executors
.newCachedThreadPool(new SolrjNamedThreadFactory(
"CloudSolrServer ThreadPool"));
private String idField = "id";
private final Set<String> NON_ROUTABLE_PARAMS;
{
NON_ROUTABLE_PARAMS = new HashSet<String>();
NON_ROUTABLE_PARAMS.add(UpdateParams.EXPUNGE_DELETES);
NON_ROUTABLE_PARAMS.add(UpdateParams.MAX_OPTIMIZE_SEGMENTS);
NON_ROUTABLE_PARAMS.add(UpdateParams.COMMIT);
NON_ROUTABLE_PARAMS.add(UpdateParams.WAIT_SEARCHER);
NON_ROUTABLE_PARAMS.add(UpdateParams.OPEN_SEARCHER);
NON_ROUTABLE_PARAMS.add(UpdateParams.SOFT_COMMIT);
NON_ROUTABLE_PARAMS.add(UpdateParams.PREPARE_COMMIT);
NON_ROUTABLE_PARAMS.add(UpdateParams.OPTIMIZE);
// Not supported via SolrCloud
// NON_ROUTABLE_PARAMS.add(UpdateParams.ROLLBACK);
}
/** /**
@ -92,7 +131,8 @@ public class CloudSolrServer extends SolrServer {
this.updatesToLeaders = true; this.updatesToLeaders = true;
} }
public CloudSolrServer(String zkHost, boolean updatesToLeaders) throws MalformedURLException { public CloudSolrServer(String zkHost, boolean updatesToLeaders)
throws MalformedURLException {
this.zkHost = zkHost; this.zkHost = zkHost;
this.myClient = HttpClientUtil.createClient(null); this.myClient = HttpClientUtil.createClient(null);
this.lbServer = new LBHttpSolrServer(myClient); this.lbServer = new LBHttpSolrServer(myClient);
@ -122,10 +162,40 @@ public class CloudSolrServer extends SolrServer {
this.updatesToLeaders = updatesToLeaders; this.updatesToLeaders = updatesToLeaders;
} }
public ResponseParser getParser() {
return lbServer.getParser();
}
/**
* Note: This setter method is <b>not thread-safe</b>.
*
* @param processor
* Default Response Parser chosen to parse the response if the parser
* were not specified as part of the request.
* @see org.apache.solr.client.solrj.SolrRequest#getResponseParser()
*/
public void setParser(ResponseParser processor) {
lbServer.setParser(processor);
}
public ZkStateReader getZkStateReader() { public ZkStateReader getZkStateReader() {
return zkStateReader; return zkStateReader;
} }
/**
* @param idField the field to route documents on.
*/
public void setIdField(String idField) {
this.idField = idField;
}
/**
* @return the field that updates are routed on.
*/
public String getIdField() {
return idField;
}
/** Sets the default collection for request */ /** Sets the default collection for request */
public void setDefaultCollection(String collection) { public void setDefaultCollection(String collection) {
this.defaultCollection = collection; this.defaultCollection = collection;
@ -179,18 +249,293 @@ public class CloudSolrServer extends SolrServer {
} }
} }
public void setParallelUpdates(boolean parallelUpdates) {
this.parallelUpdates = parallelUpdates;
}
private NamedList directUpdate(AbstractUpdateRequest request, ClusterState clusterState) throws SolrServerException {
UpdateRequest updateRequest = (UpdateRequest) request;
ModifiableSolrParams params = (ModifiableSolrParams) request.getParams();
ModifiableSolrParams routableParams = new ModifiableSolrParams();
ModifiableSolrParams nonRoutableParams = new ModifiableSolrParams();
if(params != null) {
nonRoutableParams.add(params);
routableParams.add(params);
for(String param : NON_ROUTABLE_PARAMS) {
routableParams.remove(param);
}
}
if (params == null) {
return null;
}
String collection = params.get("collection", defaultCollection);
if (collection == null) {
throw new SolrServerException("No collection param specified on request and no default collection has been set.");
}
//Check to see if the collection is an alias.
Aliases aliases = zkStateReader.getAliases();
if(aliases != null) {
Map<String, String> collectionAliases = aliases.getCollectionAliasMap();
if(collectionAliases != null && collectionAliases.containsKey(collection)) {
collection = collectionAliases.get(collection);
}
}
DocCollection col = clusterState.getCollection(collection);
DocRouter router = col.getRouter();
if (router instanceof ImplicitDocRouter) {
// short circuit as optimization
return null;
}
//Create the URL map, which is keyed on slice name.
//The value is a list of URLs for each replica in the slice.
//The first value in the list is the leader for the slice.
Map<String,List<String>> urlMap = buildUrlMap(col);
NamedList exceptions = new NamedList();
NamedList shardResponses = new NamedList();
Map<String, LBHttpSolrServer.Req> routes = updateRequest.getRoutes(router, col, urlMap, routableParams, this.idField);
if (routes == null) {
return null;
}
Iterator<Map.Entry<String, LBHttpSolrServer.Req>> it = routes.entrySet().iterator();
long start = System.nanoTime();
if(this.parallelUpdates) {
ArrayBlockingQueue<RequestTask> finishedTasks = new ArrayBlockingQueue<RequestTask>(routes.size());
while (it.hasNext()) {
Map.Entry<String, LBHttpSolrServer.Req> entry = it.next();
String url = entry.getKey();
LBHttpSolrServer.Req lbRequest = entry.getValue();
threadPool.execute(new RequestTask(url, lbRequest, finishedTasks));
}
while ((shardResponses.size() + exceptions.size()) != routes.size()) {
RequestTask requestTask = null;
try {
requestTask = finishedTasks.take();
} catch (Exception e) {
throw new SolrException(ErrorCode.SERVER_ERROR, e);
}
Exception e = requestTask.getException();
if (e != null) {
exceptions.add(requestTask.getLeader(), e);
} else {
shardResponses.add(requestTask.getLeader(), requestTask.getRsp().getResponse());
}
}
if(exceptions.size() > 0) {
throw new RouteException(ErrorCode.SERVER_ERROR, exceptions, routes);
}
} else {
while (it.hasNext()) {
Map.Entry<String, LBHttpSolrServer.Req> entry = it.next();
String url = entry.getKey();
LBHttpSolrServer.Req lbRequest = entry.getValue();
try{
NamedList rsp = lbServer.request(lbRequest).getResponse();
shardResponses.add(url, rsp);
} catch(Exception e) {
throw new SolrServerException(e);
}
}
}
UpdateRequest nonRoutableRequest = null;
List<String> deleteQuery = updateRequest.getDeleteQuery();
if (deleteQuery != null && deleteQuery.size() > 0) {
UpdateRequest deleteQueryRequest = new UpdateRequest();
deleteQueryRequest.setDeleteQuery(deleteQuery);
nonRoutableRequest = deleteQueryRequest;
}
Set<String> paramNames = nonRoutableParams.getParameterNames();
Set<String> intersection = new HashSet<String>(paramNames);
intersection.retainAll(NON_ROUTABLE_PARAMS);
if (nonRoutableRequest != null || intersection.size() > 0) {
if (nonRoutableRequest == null) {
nonRoutableRequest = new UpdateRequest();
}
nonRoutableRequest.setParams(nonRoutableParams);
List<String> urlList = new ArrayList<String>();
urlList.addAll(routes.keySet());
Collections.shuffle(urlList, rand);
LBHttpSolrServer.Req req = new LBHttpSolrServer.Req(nonRoutableRequest, urlList);
try {
LBHttpSolrServer.Rsp rsp = lbServer.request(req);
shardResponses.add(urlList.get(0), rsp.getResponse());
} catch (Exception e) {
throw new SolrException(ErrorCode.SERVER_ERROR, urlList.get(0), e);
}
}
long end = System.nanoTime();
RouteResponse rr = condenseResponse(shardResponses, (long)((end - start)/1000000));
rr.setRouteResponses(shardResponses);
rr.setRoutes(routes);
return rr;
}
private Map<String,List<String>> buildUrlMap(DocCollection col) {
Map<String, List<String>> urlMap = new HashMap<String, List<String>>();
Collection<Slice> slices = col.getActiveSlices();
Iterator<Slice> sliceIterator = slices.iterator();
while (sliceIterator.hasNext()) {
Slice slice = sliceIterator.next();
String name = slice.getName();
List<String> urls = new ArrayList<String>();
Replica leader = slice.getLeader();
ZkCoreNodeProps zkProps = new ZkCoreNodeProps(leader);
String url = zkProps.getBaseUrl() + "/" + col.getName();
urls.add(url);
Collection<Replica> replicas = slice.getReplicas();
Iterator<Replica> replicaIterator = replicas.iterator();
while (replicaIterator.hasNext()) {
Replica replica = replicaIterator.next();
if (!replica.getNodeName().equals(leader.getNodeName()) &&
!replica.getName().equals(leader.getName())) {
ZkCoreNodeProps zkProps1 = new ZkCoreNodeProps(replica);
String url1 = zkProps1.getBaseUrl() + "/" + col.getName();
urls.add(url1);
}
}
urlMap.put(name, urls);
}
return urlMap;
}
public RouteResponse condenseResponse(NamedList response, long timeMillis) {
RouteResponse condensed = new RouteResponse();
int status = 0;
for(int i=0; i<response.size(); i++) {
NamedList shardResponse = (NamedList)response.getVal(i);
NamedList header = (NamedList)shardResponse.get("responseHeader");
Integer shardStatus = (Integer)header.get("status");
int s = shardStatus.intValue();
if(s > 0) {
status = s;
}
}
NamedList cheader = new NamedList();
cheader.add("status", status);
cheader.add("QTime", timeMillis);
condensed.add("responseHeader", cheader);
return condensed;
}
class RequestTask implements Runnable {
private LBHttpSolrServer.Req req;
private ArrayBlockingQueue<RequestTask> tasks;
private LBHttpSolrServer.Rsp rsp;
private String leader;
private Exception e;
public RequestTask(String leader, LBHttpSolrServer.Req req, ArrayBlockingQueue<RequestTask> tasks) {
this.req = req;
this.tasks = tasks;
this.leader = leader;
}
public void run() {
try {
LBHttpSolrServer lb = new LBHttpSolrServer(myClient);
this.rsp = lb.request(req);
this.tasks.add(this);
} catch (Exception e) {
this.e = e;
this.tasks.add(this);
}
}
public Exception getException() {
return e;
}
public String getLeader() {
return this.leader;
}
public LBHttpSolrServer.Rsp getRsp() {
return rsp;
}
}
class RouteResponse extends NamedList {
private NamedList routeResponses;
private Map<String, LBHttpSolrServer.Req> routes;
public void setRouteResponses(NamedList routeResponses) {
this.routeResponses = routeResponses;
}
public NamedList getRouteResponses() {
return routeResponses;
}
public void setRoutes(Map<String, LBHttpSolrServer.Req> routes) {
this.routes = routes;
}
public Map<String, LBHttpSolrServer.Req> getRoutes() {
return routes;
}
}
class RouteException extends SolrException {
private NamedList exceptions;
private Map<String, LBHttpSolrServer.Req> routes;
public RouteException(ErrorCode errorCode, NamedList exceptions, Map<String, LBHttpSolrServer.Req> routes){
super(errorCode, ((Exception)exceptions.getVal(0)).getMessage(), (Exception)exceptions.getVal(0));
this.exceptions = exceptions;
this.routes = routes;
}
public NamedList getExceptions() {
return exceptions;
}
public Map<String, LBHttpSolrServer.Req> getRoutes() {
return this.routes;
}
}
@Override @Override
public NamedList<Object> request(SolrRequest request) public NamedList<Object> request(SolrRequest request)
throws SolrServerException, IOException { throws SolrServerException, IOException {
connect(); connect();
// TODO: if you can hash here, you could favor the shard leader
ClusterState clusterState = zkStateReader.getClusterState(); ClusterState clusterState = zkStateReader.getClusterState();
boolean sendToLeaders = false; boolean sendToLeaders = false;
List<String> replicas = null; List<String> replicas = null;
if (request instanceof IsUpdateRequest && updatesToLeaders) { if (request instanceof IsUpdateRequest) {
if(request instanceof UpdateRequest) {
NamedList response = directUpdate((AbstractUpdateRequest)request,clusterState);
if(response != null) {
return response;
}
}
sendToLeaders = true; sendToLeaders = true;
replicas = new ArrayList<String>(); replicas = new ArrayList<String>();
} }
@ -355,6 +700,10 @@ public class CloudSolrServer extends SolrServer {
if (myClient!=null) { if (myClient!=null) {
myClient.getConnectionManager().shutdown(); myClient.getConnectionManager().shutdown();
} }
if(this.threadPool != null && !this.threadPool.isShutdown()) {
this.threadPool.shutdown();
}
} }
public LBHttpSolrServer getLbServer() { public LBHttpSolrServer getLbServer() {

View File

@ -98,14 +98,14 @@ public class HttpSolrServer extends SolrServer {
* *
* @see org.apache.solr.client.solrj.impl.BinaryResponseParser * @see org.apache.solr.client.solrj.impl.BinaryResponseParser
*/ */
protected ResponseParser parser; protected volatile ResponseParser parser;
/** /**
* The RequestWriter used to write all requests to Solr * The RequestWriter used to write all requests to Solr
* *
* @see org.apache.solr.client.solrj.request.RequestWriter * @see org.apache.solr.client.solrj.request.RequestWriter
*/ */
protected RequestWriter requestWriter = new RequestWriter(); protected volatile RequestWriter requestWriter = new RequestWriter();
private final HttpClient httpClient; private final HttpClient httpClient;

View File

@ -18,6 +18,7 @@ package org.apache.solr.client.solrj.impl;
import org.apache.http.client.HttpClient; import org.apache.http.client.HttpClient;
import org.apache.solr.client.solrj.*; import org.apache.solr.client.solrj.*;
import org.apache.solr.client.solrj.request.RequestWriter;
import org.apache.solr.client.solrj.response.QueryResponse; import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.common.params.ModifiableSolrParams; import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.util.NamedList; import org.apache.solr.common.util.NamedList;
@ -93,7 +94,8 @@ public class LBHttpSolrServer extends SolrServer {
private final AtomicInteger counter = new AtomicInteger(-1); private final AtomicInteger counter = new AtomicInteger(-1);
private static final SolrQuery solrQuery = new SolrQuery("*:*"); private static final SolrQuery solrQuery = new SolrQuery("*:*");
private final ResponseParser parser; private volatile ResponseParser parser;
private volatile RequestWriter requestWriter;
static { static {
solrQuery.setRows(0); solrQuery.setRows(0);
@ -219,10 +221,12 @@ public class LBHttpSolrServer extends SolrServer {
} }
protected HttpSolrServer makeServer(String server) throws MalformedURLException { protected HttpSolrServer makeServer(String server) throws MalformedURLException {
return new HttpSolrServer(server, httpClient, parser); HttpSolrServer s = new HttpSolrServer(server, httpClient, parser);
if (requestWriter != null) {
s.setRequestWriter(requestWriter);
}
return s;
} }
/** /**
* Tries to query a live server from the list provided in Req. Servers in the dead pool are skipped. * Tries to query a live server from the list provided in Req. Servers in the dead pool are skipped.
@ -590,6 +594,18 @@ public class LBHttpSolrServer extends SolrServer {
return httpClient; return httpClient;
} }
public ResponseParser getParser() {
return parser;
}
public void setParser(ResponseParser parser) {
this.parser = parser;
}
public void setRequestWriter(RequestWriter requestWriter) {
this.requestWriter = requestWriter;
}
@Override @Override
protected void finalize() throws Throwable { protected void finalize() throws Throwable {
try { try {
@ -603,4 +619,5 @@ public class LBHttpSolrServer extends SolrServer {
// defaults // defaults
private static final int CHECK_INTERVAL = 60 * 1000; //1 minute between checks private static final int CHECK_INTERVAL = 60 * 1000; //1 minute between checks
private static final int NONSTANDARD_PING_LIMIT = 5; // number of times we'll ping dead servers not in the server list private static final int NONSTANDARD_PING_LIMIT = 5; // number of times we'll ping dead servers not in the server list
} }

View File

@ -23,6 +23,9 @@ import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import org.apache.solr.common.SolrInputDocument; import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.params.ModifiableSolrParams; import org.apache.solr.common.params.ModifiableSolrParams;
@ -64,10 +67,13 @@ public class JavaBinUpdateRequestCodec {
docIter = updateRequest.getDocIterator(); docIter = updateRequest.getDocIterator();
} }
Map<SolrInputDocument,Map<String,Object>> docMap = updateRequest.getDocumentsMap();
nl.add("params", params);// 0: params nl.add("params", params);// 0: params
nl.add("delById", updateRequest.getDeleteById()); nl.add("delByIdMap", updateRequest.getDeleteByIdMap());
nl.add("delByQ", updateRequest.getDeleteQuery()); nl.add("delByQ", updateRequest.getDeleteQuery());
nl.add("docs", docIter); nl.add("docs", docIter);
nl.add("docsMap", docMap);
JavaBinCodec codec = new JavaBinCodec(); JavaBinCodec codec = new JavaBinCodec();
codec.marshal(nl, os); codec.marshal(nl, os);
} }
@ -86,7 +92,9 @@ public class JavaBinUpdateRequestCodec {
public UpdateRequest unmarshal(InputStream is, final StreamingUpdateHandler handler) throws IOException { public UpdateRequest unmarshal(InputStream is, final StreamingUpdateHandler handler) throws IOException {
final UpdateRequest updateRequest = new UpdateRequest(); final UpdateRequest updateRequest = new UpdateRequest();
List<List<NamedList>> doclist; List<List<NamedList>> doclist;
Map<SolrInputDocument,Map<String,Object>> docMap;
List<String> delById; List<String> delById;
Map<String,Long> delByIdMap;
List<String> delByQ; List<String> delByQ;
final NamedList[] namedList = new NamedList[1]; final NamedList[] namedList = new NamedList[1];
JavaBinCodec codec = new JavaBinCodec() { JavaBinCodec codec = new JavaBinCodec() {
@ -158,8 +166,10 @@ public class JavaBinUpdateRequestCodec {
} }
} }
delById = (List<String>) namedList[0].get("delById"); delById = (List<String>) namedList[0].get("delById");
delByIdMap = (Map<String,Long>) namedList[0].get("delByIdMap");
delByQ = (List<String>) namedList[0].get("delByQ"); delByQ = (List<String>) namedList[0].get("delByQ");
doclist = (List) namedList[0].get("docs"); doclist = (List) namedList[0].get("docs");
docMap = (Map<SolrInputDocument,Map<String,Object>>) namedList[0].get("docsMap");
if (doclist != null && !doclist.isEmpty()) { if (doclist != null && !doclist.isEmpty()) {
List<SolrInputDocument> solrInputDocs = new ArrayList<SolrInputDocument>(); List<SolrInputDocument> solrInputDocs = new ArrayList<SolrInputDocument>();
@ -172,11 +182,29 @@ public class JavaBinUpdateRequestCodec {
} }
updateRequest.add(solrInputDocs); updateRequest.add(solrInputDocs);
} }
if (docMap != null && !docMap.isEmpty()) {
Set<Entry<SolrInputDocument,Map<String,Object>>> entries = docMap.entrySet();
for (Entry<SolrInputDocument,Map<String,Object>> entry : entries) {
Map<String,Object> map = entry.getValue();
Boolean overwrite = null;
Integer commitWithin = null;
if (map != null) {
overwrite = (Boolean) map.get(UpdateRequest.OVERWRITE);
commitWithin = (Integer) map.get(UpdateRequest.COMMIT_WITHIN);
}
updateRequest.add(entry.getKey(), commitWithin, overwrite);
}
}
if (delById != null) { if (delById != null) {
for (String s : delById) { for (String s : delById) {
updateRequest.deleteById(s); updateRequest.deleteById(s);
} }
} }
if (delByIdMap != null) {
for (Map.Entry<String,Long> entry : delByIdMap.entrySet()) {
updateRequest.deleteById(entry.getKey(), entry.getValue());
}
}
if (delByQ != null) { if (delByQ != null) {
for (String s : delByQ) { for (String s : delByQ) {
updateRequest.deleteByQuery(s); updateRequest.deleteByQuery(s);

View File

@ -26,6 +26,7 @@ import java.io.*;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.List; import java.util.List;
import java.util.Map;
import java.nio.charset.Charset; import java.nio.charset.Charset;
/** /**
@ -52,7 +53,7 @@ public class RequestWriter {
private boolean isEmpty(UpdateRequest updateRequest) { private boolean isEmpty(UpdateRequest updateRequest) {
return isNull(updateRequest.getDocuments()) && return isNull(updateRequest.getDocuments()) &&
isNull(updateRequest.getDeleteById()) && isNull(updateRequest.getDeleteByIdMap()) &&
isNull(updateRequest.getDeleteQuery()) && isNull(updateRequest.getDeleteQuery()) &&
updateRequest.getDocIterator() == null; updateRequest.getDocIterator() == null;
} }
@ -137,4 +138,8 @@ public class RequestWriter {
protected boolean isNull(List l) { protected boolean isNull(List l) {
return l == null || l.isEmpty(); return l == null || l.isEmpty();
} }
protected boolean isNull(Map l) {
return l == null || l.isEmpty();
}
} }

View File

@ -22,11 +22,21 @@ import java.io.StringWriter;
import java.io.Writer; import java.io.Writer;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.List; import java.util.HashMap;
import java.util.Iterator; import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.LinkedHashMap;
import org.apache.solr.client.solrj.impl.LBHttpSolrServer;
import org.apache.solr.client.solrj.util.ClientUtils; import org.apache.solr.client.solrj.util.ClientUtils;
import org.apache.solr.common.SolrInputDocument; import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.DocRouter;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.util.ContentStream; import org.apache.solr.common.util.ContentStream;
import org.apache.solr.common.util.XML; import org.apache.solr.common.util.XML;
@ -37,13 +47,14 @@ import org.apache.solr.common.util.XML;
*/ */
public class UpdateRequest extends AbstractUpdateRequest { public class UpdateRequest extends AbstractUpdateRequest {
private List<SolrInputDocument> documents = null; public static final String OVERWRITE = "ow";
public static final String COMMIT_WITHIN = "cw";
private Map<SolrInputDocument,Map<String,Object>> documents = null;
private Iterator<SolrInputDocument> docIterator = null; private Iterator<SolrInputDocument> docIterator = null;
private List<String> deleteById = null; private Map<String,Map<String,Object>> deleteById = null;
private List<String> deleteQuery = null; private List<String> deleteQuery = null;
public UpdateRequest() public UpdateRequest() {
{
super(METHOD.POST, "/update"); super(METHOD.POST, "/update");
} }
@ -57,8 +68,7 @@ public class UpdateRequest extends AbstractUpdateRequest {
/** /**
* clear the pending documents and delete commands * clear the pending documents and delete commands
*/ */
public void clear() public void clear() {
{
if (documents != null) { if (documents != null) {
documents.clear(); documents.clear();
} }
@ -73,44 +83,77 @@ public class UpdateRequest extends AbstractUpdateRequest {
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
public UpdateRequest add( final SolrInputDocument doc ) public UpdateRequest add(final SolrInputDocument doc) {
{
if (documents == null) { if (documents == null) {
documents = new ArrayList<SolrInputDocument>( 2 ); documents = new LinkedHashMap<SolrInputDocument,Map<String,Object>>();
} }
documents.add( doc ); documents.put(doc, null);
return this; return this;
} }
public UpdateRequest add( final Collection<SolrInputDocument> docs ) public UpdateRequest add(final SolrInputDocument doc, Boolean overwrite) {
{ return add(doc, null, overwrite);
}
public UpdateRequest add(final SolrInputDocument doc, Integer commitWithin) {
return add(doc, commitWithin, null);
}
public UpdateRequest add(final SolrInputDocument doc, Integer commitWithin,
Boolean overwrite) {
if (documents == null) { if (documents == null) {
documents = new ArrayList<SolrInputDocument>( docs.size()+1 ); documents = new LinkedHashMap<SolrInputDocument,Map<String,Object>>();
} }
documents.addAll( docs ); Map<String,Object> params = new HashMap<String,Object>(2);
if (commitWithin != null) params.put(COMMIT_WITHIN, commitWithin);
if (overwrite != null) params.put(OVERWRITE, overwrite);
documents.put(doc, params);
return this; return this;
} }
public UpdateRequest deleteById( String id ) public UpdateRequest add(final Collection<SolrInputDocument> docs) {
{ if (documents == null) {
if( deleteById == null ) { documents = new LinkedHashMap<SolrInputDocument,Map<String,Object>>();
deleteById = new ArrayList<String>();
} }
deleteById.add( id ); for (SolrInputDocument doc : docs) {
return this; documents.put(doc, null);
}
public UpdateRequest deleteById( List<String> ids )
{
if( deleteById == null ) {
deleteById = new ArrayList<String>(ids);
} else {
deleteById.addAll(ids);
} }
return this; return this;
} }
public UpdateRequest deleteByQuery( String q ) public UpdateRequest deleteById(String id) {
{ if (deleteById == null) {
deleteById = new LinkedHashMap<String,Map<String,Object>>();
}
deleteById.put(id, null);
return this;
}
public UpdateRequest deleteById(List<String> ids) {
if (deleteById == null) {
deleteById = new LinkedHashMap<String,Map<String,Object>>();
}
for (String id : ids) {
deleteById.put(id, null);
}
return this;
}
public UpdateRequest deleteById(String id, Long version) {
if (deleteById == null) {
deleteById = new LinkedHashMap<String,Map<String,Object>>();
}
Map<String,Object> params = new HashMap<String,Object>(1);
params.put("ver", version);
deleteById.put(id, params);
return this;
}
public UpdateRequest deleteByQuery(String q) {
if (deleteQuery == null) { if (deleteQuery == null) {
deleteQuery = new ArrayList<String>(); deleteQuery = new ArrayList<String>();
} }
@ -118,10 +161,101 @@ public class UpdateRequest extends AbstractUpdateRequest {
return this; return this;
} }
/**
* @param router to route updates with
* @param col DocCollection for the updates
* @param urlMap of the cluster
* @param params params to use
* @param idField the id field
* @return a Map of urls to requests
*/
public Map<String,LBHttpSolrServer.Req> getRoutes(DocRouter router,
DocCollection col, Map<String,List<String>> urlMap,
ModifiableSolrParams params, String idField) {
if ((documents == null || documents.size() == 0)
&& (deleteById == null || deleteById.size() == 0)) {
return null;
}
Map<String,LBHttpSolrServer.Req> routes = new HashMap<String,LBHttpSolrServer.Req>();
if (documents != null) {
Set<Entry<SolrInputDocument,Map<String,Object>>> entries = documents.entrySet();
for (Entry<SolrInputDocument,Map<String,Object>> entry : entries) {
SolrInputDocument doc = entry.getKey();
Object id = doc.getFieldValue(idField);
if (id == null) {
return null;
}
Slice slice = router.getTargetSlice(id
.toString(), doc, null, col);
if (slice == null) {
return null;
}
List<String> urls = urlMap.get(slice.getName());
String leaderUrl = urls.get(0);
LBHttpSolrServer.Req request = (LBHttpSolrServer.Req) routes
.get(leaderUrl);
if (request == null) {
UpdateRequest updateRequest = new UpdateRequest();
updateRequest.setMethod(getMethod());
updateRequest.setCommitWithin(getCommitWithin());
updateRequest.setParams(params);
updateRequest.setPath(getPath());
request = new LBHttpSolrServer.Req(updateRequest, urls);
routes.put(leaderUrl, request);
}
UpdateRequest urequest = (UpdateRequest) request.getRequest();
urequest.add(doc);
}
}
// Route the deleteById's
if (deleteById != null) {
Iterator<Map.Entry<String,Map<String,Object>>> entries = deleteById.entrySet()
.iterator();
while (entries.hasNext()) {
Map.Entry<String,Map<String,Object>> entry = entries.next();
String deleteId = entry.getKey();
Map<String,Object> map = entry.getValue();
Long version = null;
if (map != null) {
version = (Long) map.get("ver");
}
Slice slice = router.getTargetSlice(deleteId, null, null, col);
if (slice == null) {
return null;
}
List<String> urls = urlMap.get(slice.getName());
String leaderUrl = urls.get(0);
LBHttpSolrServer.Req request = routes.get(leaderUrl);
if (request != null) {
UpdateRequest urequest = (UpdateRequest) request.getRequest();
urequest.deleteById(deleteId, version);
} else {
UpdateRequest urequest = new UpdateRequest();
urequest.deleteById(deleteId, version);
request = new LBHttpSolrServer.Req(urequest, urls);
routes.put(leaderUrl, request);
}
}
}
return routes;
}
public void setDocIterator(Iterator<SolrInputDocument> docIterator) { public void setDocIterator(Iterator<SolrInputDocument> docIterator) {
this.docIterator = docIterator; this.docIterator = docIterator;
} }
public void setDeleteQuery(List<String> deleteQuery) {
this.deleteQuery = deleteQuery;
}
// -------------------------------------------------------------------------- // --------------------------------------------------------------------------
// -------------------------------------------------------------------------- // --------------------------------------------------------------------------
@ -141,34 +275,87 @@ public class UpdateRequest extends AbstractUpdateRequest {
return (xml.length() > 0) ? xml : null; return (xml.length() > 0) ? xml : null;
} }
private List<Map<SolrInputDocument,Map<String,Object>>> getDocLists(Map<SolrInputDocument,Map<String,Object>> documents) {
List<Map<SolrInputDocument,Map<String,Object>>> docLists = new ArrayList<Map<SolrInputDocument,Map<String,Object>>>();
Map<SolrInputDocument,Map<String,Object>> docList = null;
if (this.documents != null) {
Boolean lastOverwrite = true;
Integer lastCommitWithin = -1;
Set<Entry<SolrInputDocument,Map<String,Object>>> entries = this.documents
.entrySet();
for (Entry<SolrInputDocument,Map<String,Object>> entry : entries) {
Map<String,Object> map = entry.getValue();
Boolean overwrite = null;
Integer commitWithin = null;
if (map != null) {
overwrite = (Boolean) entry.getValue().get(OVERWRITE);
commitWithin = (Integer) entry.getValue().get(COMMIT_WITHIN);
}
if (overwrite != lastOverwrite || commitWithin != lastCommitWithin
|| docLists.size() == 0) {
docList = new LinkedHashMap<SolrInputDocument,Map<String,Object>>();
docLists.add(docList);
}
docList.put(entry.getKey(), entry.getValue());
lastCommitWithin = commitWithin;
lastOverwrite = overwrite;
}
}
if (docIterator != null) {
docList = new LinkedHashMap<SolrInputDocument,Map<String,Object>>();
docLists.add(docList);
while (docIterator.hasNext()) {
SolrInputDocument doc = docIterator.next();
if (doc != null) {
docList.put(doc, null);
}
}
}
return docLists;
}
/** /**
* @since solr 1.4 * @since solr 1.4
*/ */
public void writeXML(Writer writer) throws IOException { public void writeXML(Writer writer) throws IOException {
if( (documents != null && documents.size() > 0) || docIterator != null) { List<Map<SolrInputDocument,Map<String,Object>>> getDocLists = getDocLists(documents);
if( commitWithin > 0 ) {
writer.write("<add commitWithin=\""+commitWithin+"\">"); for (Map<SolrInputDocument,Map<String,Object>> docs : getDocLists) {
if ((docs != null && docs.size() > 0)) {
Entry<SolrInputDocument,Map<String,Object>> firstDoc = docs.entrySet()
.iterator().next();
Map<String,Object> map = firstDoc.getValue();
Integer cw = null;
Boolean ow = null;
if (map != null) {
cw = (Integer) firstDoc.getValue().get(COMMIT_WITHIN);
ow = (Boolean) firstDoc.getValue().get(OVERWRITE);
} }
else { if (ow == null) ow = true;
int commitWithin = (cw != null && cw != -1) ? cw : this.commitWithin;
boolean overwrite = ow;
if (commitWithin > -1 || overwrite != true) {
writer.write("<add commitWithin=\"" + commitWithin + "\" "
+ "overwrite=\"" + overwrite + "\">");
} else {
writer.write("<add>"); writer.write("<add>");
} }
if(documents != null) {
for (SolrInputDocument doc : documents) { Set<Entry<SolrInputDocument,Map<String,Object>>> entries = docs
if (doc != null) { .entrySet();
ClientUtils.writeXML(doc, writer); for (Entry<SolrInputDocument,Map<String,Object>> entry : entries) {
} ClientUtils.writeXML(entry.getKey(), writer);
}
}
if (docIterator != null) {
while (docIterator.hasNext()) {
SolrInputDocument doc = docIterator.next();
if (doc != null) {
ClientUtils.writeXML(doc, writer);
}
}
} }
writer.write("</add>"); writer.write("</add>");
} }
}
// Add the delete commands // Add the delete commands
boolean deleteI = deleteById != null && deleteById.size() > 0; boolean deleteI = deleteById != null && deleteById.size() > 0;
@ -180,9 +367,18 @@ public class UpdateRequest extends AbstractUpdateRequest {
writer.append("<delete>"); writer.append("<delete>");
} }
if (deleteI) { if (deleteI) {
for( String id : deleteById ) { for (Map.Entry<String,Map<String,Object>> entry : deleteById.entrySet()) {
writer.append( "<id>" ); writer.append("<id");
XML.escapeCharData( id, writer ); Map<String,Object> map = entry.getValue();
if (map != null) {
Long version = (Long) map.get("ver");
if (version != null) {
writer.append(" version=\"" + version + "\"");
}
}
writer.append(">");
XML.escapeCharData(entry.getKey(), writer);
writer.append("</id>"); writer.append("</id>");
} }
} }
@ -197,7 +393,6 @@ public class UpdateRequest extends AbstractUpdateRequest {
} }
} }
// -------------------------------------------------------------------------- // --------------------------------------------------------------------------
// -------------------------------------------------------------------------- // --------------------------------------------------------------------------
@ -206,6 +401,13 @@ public class UpdateRequest extends AbstractUpdateRequest {
// -------------------------------------------------------------------------- // --------------------------------------------------------------------------
public List<SolrInputDocument> getDocuments() { public List<SolrInputDocument> getDocuments() {
if (documents == null) return null;
List<SolrInputDocument> docs = new ArrayList<SolrInputDocument>(documents.size());
docs.addAll(documents.keySet());
return docs;
}
public Map<SolrInputDocument,Map<String,Object>> getDocumentsMap() {
return documents; return documents;
} }
@ -214,6 +416,12 @@ public class UpdateRequest extends AbstractUpdateRequest {
} }
public List<String> getDeleteById() { public List<String> getDeleteById() {
if (deleteById == null) return null;
List<String> deletes = new ArrayList<String>(deleteById.keySet());
return deletes;
}
public Map<String,Map<String,Object>> getDeleteByIdMap() {
return deleteById; return deleteById;
} }

View File

@ -1,252 +0,0 @@
package org.apache.solr.client.solrj.request;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.io.IOException;
import java.io.StringWriter;
import java.io.Writer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.solr.client.solrj.util.ClientUtils;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.util.ContentStream;
import org.apache.solr.common.util.XML;
// TODO: bake this into UpdateRequest
public class UpdateRequestExt extends AbstractUpdateRequest {
private List<SolrDoc> documents = null;
private Map<String,Long> deleteById = null;
private List<String> deleteQuery = null;
private class SolrDoc {
@Override
public String toString() {
return "SolrDoc [document=" + document + ", commitWithin=" + commitWithin
+ ", overwrite=" + overwrite + "]";
}
SolrInputDocument document;
int commitWithin;
boolean overwrite;
}
public UpdateRequestExt() {
super(METHOD.POST, "/update");
}
public UpdateRequestExt(String url) {
super(METHOD.POST, url);
}
// ---------------------------------------------------------------------------
// ---------------------------------------------------------------------------
/**
* clear the pending documents and delete commands
*/
public void clear() {
if (documents != null) {
documents.clear();
}
if (deleteById != null) {
deleteById.clear();
}
if (deleteQuery != null) {
deleteQuery.clear();
}
}
// ---------------------------------------------------------------------------
// ---------------------------------------------------------------------------
public UpdateRequestExt add(final SolrInputDocument doc) {
if (documents == null) {
documents = new ArrayList<SolrDoc>(2);
}
SolrDoc solrDoc = new SolrDoc();
solrDoc.document = doc;
solrDoc.commitWithin = -1;
solrDoc.overwrite = true;
documents.add(solrDoc);
return this;
}
public UpdateRequestExt add(final SolrInputDocument doc, int commitWithin,
boolean overwrite) {
if (documents == null) {
documents = new ArrayList<SolrDoc>(2);
}
SolrDoc solrDoc = new SolrDoc();
solrDoc.document = doc;
solrDoc.commitWithin = commitWithin;
solrDoc.overwrite = overwrite;
documents.add(solrDoc);
return this;
}
public UpdateRequestExt deleteById(String id) {
if (deleteById == null) {
deleteById = new HashMap<String,Long>();
}
deleteById.put(id, null);
return this;
}
public UpdateRequestExt deleteById(String id, Long version) {
if (deleteById == null) {
deleteById = new HashMap<String,Long>();
}
deleteById.put(id, version);
return this;
}
public UpdateRequestExt deleteById(List<String> ids) {
if (deleteById == null) {
deleteById = new HashMap<String,Long>();
} else {
for (String id : ids) {
deleteById.put(id, null);
}
}
return this;
}
public UpdateRequestExt deleteByQuery(String q) {
if (deleteQuery == null) {
deleteQuery = new ArrayList<String>();
}
deleteQuery.add(q);
return this;
}
// --------------------------------------------------------------------------
// --------------------------------------------------------------------------
@Override
public Collection<ContentStream> getContentStreams() throws IOException {
return ClientUtils.toContentStreams(getXML(), ClientUtils.TEXT_XML);
}
public String getXML() throws IOException {
StringWriter writer = new StringWriter();
writeXML(writer);
writer.flush();
String xml = writer.toString();
return (xml.length() > 0) ? xml : null;
}
public void writeXML(Writer writer) throws IOException {
List<List<SolrDoc>> getDocLists = getDocLists(documents);
for (List<SolrDoc> docs : getDocLists) {
if ((docs != null && docs.size() > 0)) {
SolrDoc firstDoc = docs.get(0);
int commitWithin = firstDoc.commitWithin != -1 ? firstDoc.commitWithin : this.commitWithin;
boolean overwrite = firstDoc.overwrite;
if (commitWithin > -1 || overwrite != true) {
writer.write("<add commitWithin=\"" + commitWithin + "\" " + "overwrite=\"" + overwrite + "\">");
} else {
writer.write("<add>");
}
if (documents != null) {
for (SolrDoc doc : documents) {
if (doc != null) {
ClientUtils.writeXML(doc.document, writer);
}
}
}
writer.write("</add>");
}
}
// Add the delete commands
boolean deleteI = deleteById != null && deleteById.size() > 0;
boolean deleteQ = deleteQuery != null && deleteQuery.size() > 0;
if (deleteI || deleteQ) {
writer.append("<delete>");
if (deleteI) {
for (Map.Entry<String,Long> entry : deleteById.entrySet()) {
writer.append("<id");
Long version = entry.getValue();
if (version != null) {
writer.append(" version=\"" + version + "\"");
}
writer.append(">");
XML.escapeCharData(entry.getKey(), writer);
writer.append("</id>");
}
}
if (deleteQ) {
for (String q : deleteQuery) {
writer.append("<query>");
XML.escapeCharData(q, writer);
writer.append("</query>");
}
}
writer.append("</delete>");
}
}
private List<List<SolrDoc>> getDocLists(List<SolrDoc> documents) {
List<List<SolrDoc>> docLists = new ArrayList<List<SolrDoc>>();
if (this.documents == null) {
return docLists;
}
boolean lastOverwrite = true;
int lastCommitWithin = -1;
List<SolrDoc> docList = null;
for (SolrDoc doc : this.documents) {
if (doc.overwrite != lastOverwrite
|| doc.commitWithin != lastCommitWithin || docLists.size() == 0) {
docList = new ArrayList<SolrDoc>();
docLists.add(docList);
}
docList.add(doc);
lastCommitWithin = doc.commitWithin;
lastOverwrite = doc.overwrite;
}
return docLists;
}
public Map<String,Long> getDeleteById() {
return deleteById;
}
public List<String> getDeleteQuery() {
return deleteQuery;
}
@Override
public String toString() {
return "UpdateRequestExt [documents=" + documents + ", deleteById="
+ deleteById + ", deleteQuery=" + deleteQuery + "]";
}
}

View File

@ -19,13 +19,22 @@ package org.apache.solr.client.solrj.impl;
import java.io.File; import java.io.File;
import java.util.HashSet; import java.util.HashSet;
import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Set; import java.util.Set;
import org.apache.lucene.util.LuceneTestCase.Slow; import org.apache.lucene.util.LuceneTestCase.Slow;
import org.apache.solr.client.solrj.request.AbstractUpdateRequest;
import org.apache.solr.client.solrj.request.QueryRequest;
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.cloud.AbstractFullDistribZkTestBase; import org.apache.solr.cloud.AbstractFullDistribZkTestBase;
import org.apache.solr.cloud.AbstractZkTestCase; import org.apache.solr.cloud.AbstractZkTestCase;
import org.apache.solr.common.SolrDocumentList;
import org.apache.solr.common.SolrInputDocument; import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.util.ExternalPaths; import org.apache.solr.util.ExternalPaths;
import org.junit.After; import org.junit.After;
import org.junit.AfterClass; import org.junit.AfterClass;
@ -100,6 +109,92 @@ public class CloudSolrServerTest extends AbstractFullDistribZkTestBase {
del("*:*"); del("*:*");
commit();
SolrInputDocument doc1 = new SolrInputDocument();
doc1.addField(id, "0");
doc1.addField("a_t", "hello1");
SolrInputDocument doc2 = new SolrInputDocument();
doc2.addField(id, "2");
doc2.addField("a_t", "hello2");
UpdateRequest request = new UpdateRequest();
request.add(doc1);
request.add(doc2);
request.setAction(AbstractUpdateRequest.ACTION.COMMIT, false, false);
// Test single threaded routed updates for UpdateRequest
NamedList response = cloudClient.request(request);
CloudSolrServer.RouteResponse rr = (CloudSolrServer.RouteResponse) response;
Map<String,LBHttpSolrServer.Req> routes = rr.getRoutes();
Iterator<Map.Entry<String,LBHttpSolrServer.Req>> it = routes.entrySet()
.iterator();
while (it.hasNext()) {
Map.Entry<String,LBHttpSolrServer.Req> entry = it.next();
String url = entry.getKey();
UpdateRequest updateRequest = (UpdateRequest) entry.getValue()
.getRequest();
SolrInputDocument doc = updateRequest.getDocuments().get(0);
String id = doc.getField("id").getValue().toString();
ModifiableSolrParams params = new ModifiableSolrParams();
params.add("q", "id:" + id);
params.add("distrib", "false");
QueryRequest queryRequest = new QueryRequest(params);
HttpSolrServer solrServer = new HttpSolrServer(url);
QueryResponse queryResponse = queryRequest.process(solrServer);
SolrDocumentList docList = queryResponse.getResults();
assertTrue(docList.getNumFound() == 1);
}
// Test the deleteById routing for UpdateRequest
UpdateRequest delRequest = new UpdateRequest();
delRequest.deleteById("0");
delRequest.deleteById("2");
delRequest.setAction(AbstractUpdateRequest.ACTION.COMMIT, false, false);
cloudClient.request(delRequest);
ModifiableSolrParams qParams = new ModifiableSolrParams();
qParams.add("q", "*:*");
QueryRequest qRequest = new QueryRequest(qParams);
QueryResponse qResponse = qRequest.process(cloudClient);
SolrDocumentList docs = qResponse.getResults();
assertTrue(docs.getNumFound() == 0);
// Test Multi-Threaded routed updates for UpdateRequest
CloudSolrServer threadedClient = null;
try {
threadedClient = new CloudSolrServer(zkServer.getZkAddress());
threadedClient.setParallelUpdates(true);
threadedClient.setDefaultCollection("collection1");
response = threadedClient.request(request);
rr = (CloudSolrServer.RouteResponse) response;
routes = rr.getRoutes();
it = routes.entrySet()
.iterator();
while (it.hasNext()) {
Map.Entry<String,LBHttpSolrServer.Req> entry = it.next();
String url = entry.getKey();
UpdateRequest updateRequest = (UpdateRequest) entry.getValue()
.getRequest();
SolrInputDocument doc = updateRequest.getDocuments().get(0);
String id = doc.getField("id").getValue().toString();
ModifiableSolrParams params = new ModifiableSolrParams();
params.add("q", "id:" + id);
params.add("distrib", "false");
QueryRequest queryRequest = new QueryRequest(params);
HttpSolrServer solrServer = new HttpSolrServer(url);
QueryResponse queryResponse = queryRequest.process(solrServer);
SolrDocumentList docList = queryResponse.getResults();
assertTrue(docList.getNumFound() == 1);
}
} finally {
threadedClient.shutdown();
}
del("*:*");
commit();
indexr(id, 0, "a_t", "to come to the aid of their country."); indexr(id, 0, "a_t", "to come to the aid of their country.");
CloudJettyRunner shard1Leader = shardToLeaderJetty.get("shard1"); CloudJettyRunner shard1Leader = shardToLeaderJetty.get("shard1");

View File

@ -16,24 +16,23 @@
*/ */
package org.apache.solr.client.solrj.request; package org.apache.solr.client.solrj.request;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import junit.framework.Assert; import junit.framework.Assert;
import org.apache.lucene.util.LuceneTestCase; import org.apache.lucene.util.LuceneTestCase;
import org.apache.solr.common.SolrInputDocument; import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.SolrInputField; import org.apache.solr.common.SolrInputField;
import org.apache.solr.common.util.FastInputStream;
import org.junit.Test; import org.junit.Test;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.Set;
import java.util.HashSet;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.ArrayList;
/** /**
* Test for UpdateRequestCodec * Test for UpdateRequestCodec
* *
@ -93,7 +92,7 @@ public class TestUpdateRequestCodec extends LuceneTestCase {
}; };
UpdateRequest updateUnmarshalled = codec.unmarshal(new ByteArrayInputStream(baos.toByteArray()) ,handler); UpdateRequest updateUnmarshalled = codec.unmarshal(new ByteArrayInputStream(baos.toByteArray()) ,handler);
Assert.assertNull(updateUnmarshalled.getDocuments());
for (SolrInputDocument document : docs) { for (SolrInputDocument document : docs) {
updateUnmarshalled.add(document); updateUnmarshalled.add(document);
} }
@ -144,7 +143,7 @@ public class TestUpdateRequestCodec extends LuceneTestCase {
}; };
UpdateRequest updateUnmarshalled = codec.unmarshal(new ByteArrayInputStream(baos.toByteArray()) ,handler); UpdateRequest updateUnmarshalled = codec.unmarshal(new ByteArrayInputStream(baos.toByteArray()) ,handler);
Assert.assertNull(updateUnmarshalled.getDocuments());
for (SolrInputDocument document : docs) { for (SolrInputDocument document : docs) {
updateUnmarshalled.add(document); updateUnmarshalled.add(document);
} }

View File

@ -246,6 +246,7 @@ public abstract class AbstractFullDistribZkTestBase extends AbstractDistribZkTes
protected CloudSolrServer createCloudClient(String defaultCollection) protected CloudSolrServer createCloudClient(String defaultCollection)
throws MalformedURLException { throws MalformedURLException {
CloudSolrServer server = new CloudSolrServer(zkServer.getZkAddress(), random().nextBoolean()); CloudSolrServer server = new CloudSolrServer(zkServer.getZkAddress(), random().nextBoolean());
server.setParallelUpdates(random().nextBoolean());
if (defaultCollection != null) server.setDefaultCollection(defaultCollection); if (defaultCollection != null) server.setDefaultCollection(defaultCollection);
server.getLbServer().getHttpClient().getParams() server.getLbServer().getHttpClient().getParams()
.setParameter(CoreConnectionPNames.CONNECTION_TIMEOUT, 5000); .setParameter(CoreConnectionPNames.CONNECTION_TIMEOUT, 5000);
@ -1696,6 +1697,7 @@ public abstract class AbstractFullDistribZkTestBase extends AbstractDistribZkTes
synchronized(this) { synchronized(this) {
try { try {
commondCloudSolrServer = new CloudSolrServer(zkServer.getZkAddress(), random().nextBoolean()); commondCloudSolrServer = new CloudSolrServer(zkServer.getZkAddress(), random().nextBoolean());
commondCloudSolrServer.setParallelUpdates(random().nextBoolean());
commondCloudSolrServer.setDefaultCollection(DEFAULT_COLLECTION); commondCloudSolrServer.setDefaultCollection(DEFAULT_COLLECTION);
commondCloudSolrServer.connect(); commondCloudSolrServer.connect();
} catch (MalformedURLException e) { } catch (MalformedURLException e) {