diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt index 973c518fbd4..2f9f5d34943 100644 --- a/solr/CHANGES.txt +++ b/solr/CHANGES.txt @@ -295,6 +295,8 @@ Optimizations Bug Fixes ---------------------- +* SOLR-3139: Make ConcurrentUpdateSolrServer send UpdateRequest.getParams() + as HTTP request params (siren) * SOLR-3165: Cannot use DIH in Solrcloud + Zookeeper (Alexey Serba, Mark Miller, siren) diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateSolrServer.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateSolrServer.java index e2acc2dc585..438f5182670 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateSolrServer.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateSolrServer.java @@ -44,6 +44,8 @@ import org.apache.solr.client.solrj.SolrServerException; import org.apache.solr.client.solrj.request.RequestWriter; import org.apache.solr.client.solrj.request.UpdateRequest; import org.apache.solr.client.solrj.util.ClientUtils; +import org.apache.solr.common.params.CommonParams; +import org.apache.solr.common.params.ModifiableSolrParams; import org.apache.solr.common.params.SolrParams; import org.apache.solr.common.params.UpdateParams; import org.apache.solr.common.util.NamedList; @@ -54,6 +56,10 @@ import org.slf4j.LoggerFactory; * ConcurrentUpdateSolrServer buffers all added documents and writes * them into open HTTP connections. This class is thread safe. * + * Params from {@link UpdateRequest} are converted to http request + * parameters. When params change between UpdateRequests a new HTTP + * request is started. + * * Although any SolrServer request can be made with this implementation, it is * only recommended to use ConcurrentUpdateSolrServer with /update * requests. The class {@link HttpSolrServer} is better suited for the @@ -66,7 +72,6 @@ public class ConcurrentUpdateSolrServer extends SolrServer { private HttpSolrServer server; final BlockingQueue queue; final ExecutorService scheduler = Executors.newCachedThreadPool(); - final String updateUrl = "/update"; final Queue runners; volatile CountDownLatch lock = null; // used to block everything final int threadCount; @@ -124,19 +129,28 @@ public class ConcurrentUpdateSolrServer extends SolrServer { if (updateRequest == null) break; + final boolean isXml = ClientUtils.TEXT_XML.equals(server.requestWriter + .getUpdateContentType()); + + final ModifiableSolrParams origParams = new ModifiableSolrParams(updateRequest.getParams()); + EntityTemplate template = new EntityTemplate(new ContentProducer() { public void writeTo(OutputStream out) throws IOException { try { - if (ClientUtils.TEXT_XML.equals(server.requestWriter - .getUpdateContentType())) { + if (isXml) { out.write("".getBytes("UTF-8")); // can be anything } UpdateRequest req = updateRequest; while (req != null) { + SolrParams currentParams = new ModifiableSolrParams(req.getParams()); + if (!origParams.toNamedList().equals(currentParams.toNamedList())) { + queue.add(req); // params are different, push back to queue + break; + } + server.requestWriter.write(req, out); - if (ClientUtils.TEXT_XML.equals(server.requestWriter - .getUpdateContentType())) { + if (isXml) { // check for commit or optimize SolrParams params = req.getParams(); if (params != null) { @@ -158,8 +172,7 @@ public class ConcurrentUpdateSolrServer extends SolrServer { out.flush(); req = queue.poll(250, TimeUnit.MILLISECONDS); } - if (ClientUtils.TEXT_XML.equals(server.requestWriter - .getUpdateContentType())) { + if (isXml) { out.write("".getBytes("UTF-8")); } @@ -168,11 +181,17 @@ public class ConcurrentUpdateSolrServer extends SolrServer { } } }); + + // The parser 'wt=' and 'version=' params are used instead of the + // original params + ModifiableSolrParams requestParams = new ModifiableSolrParams(origParams); + requestParams.set(CommonParams.WT, server.parser.getWriterType()); + requestParams.set(CommonParams.VERSION, server.parser.getVersion()); - String path = ClientUtils.TEXT_XML.equals(server.requestWriter - .getUpdateContentType()) ? "/update" : "/update/javabin"; + final String path = isXml ? "/update" : "/update/javabin"; - method = new HttpPost(server.getBaseURL() + path); + method = new HttpPost(server.getBaseURL() + path + + ClientUtils.toQueryString(requestParams, false)); method.setEntity(template); method.addHeader("User-Agent", HttpSolrServer.AGENT); response = server.getHttpClient().execute(method); diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/SolrExampleTests.java b/solr/solrj/src/test/org/apache/solr/client/solrj/SolrExampleTests.java index 64059303027..879d666e2ed 100644 --- a/solr/solrj/src/test/org/apache/solr/client/solrj/SolrExampleTests.java +++ b/solr/solrj/src/test/org/apache/solr/client/solrj/SolrExampleTests.java @@ -553,6 +553,33 @@ abstract public class SolrExampleTests extends SolrJettyTestBase assertEquals( 10, ((Integer)out1.get( "ten" )).intValue() ); } + @Test + public void testUpdateRequestWithParameters() throws Exception { + SolrServer server1 = createNewSolrServer(); + + System.out.println("server:" + server1.getClass().toString()); + + server1.deleteByQuery( "*:*" ); + server1.commit(); + + SolrInputDocument doc = new SolrInputDocument(); + doc.addField("id", "id1"); + + UpdateRequest req = new UpdateRequest(); + req.setParam("overwrite", "false"); + req.add(doc); + server1.request(req); + server1.request(req); + server1.commit(); + + SolrQuery query = new SolrQuery(); + query.setQuery("*:*"); + QueryResponse rsp = server1.query(query); + + SolrDocumentList out = rsp.getResults(); + assertEquals(2, out.getNumFound()); + } + @Test public void testContentStreamRequest() throws Exception { SolrServer server = getSolrServer();