SOLR-3139: send UpdateRequest.getParams() as HTTP request params

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1325884 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Sami Siren 2012-04-13 18:04:16 +00:00
parent 24d47c6c11
commit 91b6b379cd
3 changed files with 58 additions and 10 deletions

View File

@ -295,6 +295,8 @@ Optimizations
Bug Fixes Bug Fixes
---------------------- ----------------------
* SOLR-3139: Make ConcurrentUpdateSolrServer send UpdateRequest.getParams()
as HTTP request params (siren)
* SOLR-3165: Cannot use DIH in Solrcloud + Zookeeper (Alexey Serba, * SOLR-3165: Cannot use DIH in Solrcloud + Zookeeper (Alexey Serba,
Mark Miller, siren) Mark Miller, siren)

View File

@ -44,6 +44,8 @@ import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.request.RequestWriter; import org.apache.solr.client.solrj.request.RequestWriter;
import org.apache.solr.client.solrj.request.UpdateRequest; import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.client.solrj.util.ClientUtils; import org.apache.solr.client.solrj.util.ClientUtils;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.SolrParams; import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.params.UpdateParams; import org.apache.solr.common.params.UpdateParams;
import org.apache.solr.common.util.NamedList; import org.apache.solr.common.util.NamedList;
@ -54,6 +56,10 @@ import org.slf4j.LoggerFactory;
* ConcurrentUpdateSolrServer buffers all added documents and writes * ConcurrentUpdateSolrServer buffers all added documents and writes
* them into open HTTP connections. This class is thread safe. * them into open HTTP connections. This class is thread safe.
* *
* Params from {@link UpdateRequest} are converted to http request
* parameters. When params change between UpdateRequests a new HTTP
* request is started.
*
* Although any SolrServer request can be made with this implementation, it is * Although any SolrServer request can be made with this implementation, it is
* only recommended to use ConcurrentUpdateSolrServer with /update * only recommended to use ConcurrentUpdateSolrServer with /update
* requests. The class {@link HttpSolrServer} is better suited for the * requests. The class {@link HttpSolrServer} is better suited for the
@ -66,7 +72,6 @@ public class ConcurrentUpdateSolrServer extends SolrServer {
private HttpSolrServer server; private HttpSolrServer server;
final BlockingQueue<UpdateRequest> queue; final BlockingQueue<UpdateRequest> queue;
final ExecutorService scheduler = Executors.newCachedThreadPool(); final ExecutorService scheduler = Executors.newCachedThreadPool();
final String updateUrl = "/update";
final Queue<Runner> runners; final Queue<Runner> runners;
volatile CountDownLatch lock = null; // used to block everything volatile CountDownLatch lock = null; // used to block everything
final int threadCount; final int threadCount;
@ -124,19 +129,28 @@ public class ConcurrentUpdateSolrServer extends SolrServer {
if (updateRequest == null) if (updateRequest == null)
break; break;
final boolean isXml = ClientUtils.TEXT_XML.equals(server.requestWriter
.getUpdateContentType());
final ModifiableSolrParams origParams = new ModifiableSolrParams(updateRequest.getParams());
EntityTemplate template = new EntityTemplate(new ContentProducer() { EntityTemplate template = new EntityTemplate(new ContentProducer() {
public void writeTo(OutputStream out) throws IOException { public void writeTo(OutputStream out) throws IOException {
try { try {
if (ClientUtils.TEXT_XML.equals(server.requestWriter if (isXml) {
.getUpdateContentType())) {
out.write("<stream>".getBytes("UTF-8")); // can be anything out.write("<stream>".getBytes("UTF-8")); // can be anything
} }
UpdateRequest req = updateRequest; UpdateRequest req = updateRequest;
while (req != null) { while (req != null) {
SolrParams currentParams = new ModifiableSolrParams(req.getParams());
if (!origParams.toNamedList().equals(currentParams.toNamedList())) {
queue.add(req); // params are different, push back to queue
break;
}
server.requestWriter.write(req, out); server.requestWriter.write(req, out);
if (ClientUtils.TEXT_XML.equals(server.requestWriter if (isXml) {
.getUpdateContentType())) {
// check for commit or optimize // check for commit or optimize
SolrParams params = req.getParams(); SolrParams params = req.getParams();
if (params != null) { if (params != null) {
@ -158,8 +172,7 @@ public class ConcurrentUpdateSolrServer extends SolrServer {
out.flush(); out.flush();
req = queue.poll(250, TimeUnit.MILLISECONDS); req = queue.poll(250, TimeUnit.MILLISECONDS);
} }
if (ClientUtils.TEXT_XML.equals(server.requestWriter if (isXml) {
.getUpdateContentType())) {
out.write("</stream>".getBytes("UTF-8")); out.write("</stream>".getBytes("UTF-8"));
} }
@ -168,11 +181,17 @@ public class ConcurrentUpdateSolrServer extends SolrServer {
} }
} }
}); });
// The parser 'wt=' and 'version=' params are used instead of the
// original params
ModifiableSolrParams requestParams = new ModifiableSolrParams(origParams);
requestParams.set(CommonParams.WT, server.parser.getWriterType());
requestParams.set(CommonParams.VERSION, server.parser.getVersion());
String path = ClientUtils.TEXT_XML.equals(server.requestWriter final String path = isXml ? "/update" : "/update/javabin";
.getUpdateContentType()) ? "/update" : "/update/javabin";
method = new HttpPost(server.getBaseURL() + path); method = new HttpPost(server.getBaseURL() + path
+ ClientUtils.toQueryString(requestParams, false));
method.setEntity(template); method.setEntity(template);
method.addHeader("User-Agent", HttpSolrServer.AGENT); method.addHeader("User-Agent", HttpSolrServer.AGENT);
response = server.getHttpClient().execute(method); response = server.getHttpClient().execute(method);

View File

@ -553,6 +553,33 @@ abstract public class SolrExampleTests extends SolrJettyTestBase
assertEquals( 10, ((Integer)out1.get( "ten" )).intValue() ); assertEquals( 10, ((Integer)out1.get( "ten" )).intValue() );
} }
@Test
public void testUpdateRequestWithParameters() throws Exception {
SolrServer server1 = createNewSolrServer();
System.out.println("server:" + server1.getClass().toString());
server1.deleteByQuery( "*:*" );
server1.commit();
SolrInputDocument doc = new SolrInputDocument();
doc.addField("id", "id1");
UpdateRequest req = new UpdateRequest();
req.setParam("overwrite", "false");
req.add(doc);
server1.request(req);
server1.request(req);
server1.commit();
SolrQuery query = new SolrQuery();
query.setQuery("*:*");
QueryResponse rsp = server1.query(query);
SolrDocumentList out = rsp.getResults();
assertEquals(2, out.getNumFound());
}
@Test @Test
public void testContentStreamRequest() throws Exception { public void testContentStreamRequest() throws Exception {
SolrServer server = getSolrServer(); SolrServer server = getSolrServer();