From 0242409fae59a8c13eed14810a1aa007afbc6890 Mon Sep 17 00:00:00 2001 From: Noble Paul Date: Mon, 11 Jun 2018 13:04:54 +1000 Subject: [PATCH] SOLR-12474: Add an UpdateRequest Object that implements RequestWriter.ContentWriter --- solr/CHANGES.txt | 3 + .../solr/cloud/BasicDistributedZkTest.java | 53 ++++++------- .../solr/cloud/SolrCloudExampleTest.java | 6 +- .../solrj/request/AbstractUpdateRequest.java | 3 +- .../solrj/request/StreamingUpdateRequest.java | 77 +++++++++++++++++++ .../solr/client/solrj/SolrExampleTests.java | 15 ++++ 6 files changed, 126 insertions(+), 31 deletions(-) create mode 100644 solr/solrj/src/java/org/apache/solr/client/solrj/request/StreamingUpdateRequest.java diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt index 993edcbe668..b66deac97f4 100644 --- a/solr/CHANGES.txt +++ b/solr/CHANGES.txt @@ -72,6 +72,9 @@ New Features be subject to whatever the sort criteria is. Additionally, QEC was extensively refactored to be more extensible. (Bruno Roustant, David Smiley) +* SOLR-12474: Add an UpdateRequest Object that implements RequestWriter.ContentWriter (noble) + + Bug Fixes ---------------------- diff --git a/solr/core/src/test/org/apache/solr/cloud/BasicDistributedZkTest.java b/solr/core/src/test/org/apache/solr/cloud/BasicDistributedZkTest.java index 2190c8023e1..da880b47d05 100644 --- a/solr/core/src/test/org/apache/solr/cloud/BasicDistributedZkTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/BasicDistributedZkTest.java @@ -16,6 +16,25 @@ */ package org.apache.solr.cloud; +import java.io.IOException; +import java.lang.invoke.MethodHandles; +import java.net.URL; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.CompletionService; +import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.Future; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + import org.apache.commons.lang.StringUtils; import org.apache.lucene.util.IOUtils; import org.apache.lucene.util.LuceneTestCase.Slow; @@ -29,10 +48,10 @@ import org.apache.solr.client.solrj.embedded.JettySolrRunner; import org.apache.solr.client.solrj.impl.HttpSolrClient; import org.apache.solr.client.solrj.request.AbstractUpdateRequest; import org.apache.solr.client.solrj.request.CollectionAdminRequest; -import org.apache.solr.client.solrj.request.ContentStreamUpdateRequest; import org.apache.solr.client.solrj.request.CoreAdminRequest.Create; import org.apache.solr.client.solrj.request.CoreAdminRequest.Unload; import org.apache.solr.client.solrj.request.QueryRequest; +import org.apache.solr.client.solrj.request.StreamingUpdateRequest; import org.apache.solr.client.solrj.request.UpdateRequest; import org.apache.solr.client.solrj.response.CollectionAdminResponse; import org.apache.solr.client.solrj.response.QueryResponse; @@ -60,25 +79,6 @@ import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.lang.invoke.MethodHandles; -import java.net.URL; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.Callable; -import java.util.concurrent.CompletionService; -import java.util.concurrent.ExecutorCompletionService; -import java.util.concurrent.Future; -import java.util.concurrent.SynchronousQueue; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; - /** * This test simply does a bunch of basic things in solrcloud mode and asserts things @@ -750,12 +750,13 @@ public class BasicDistributedZkTest extends AbstractFullDistribZkTestBase { throws SolrServerException, IOException { log.info("### STARTING testNumberOfCommitsWithCommitAfterAdd"); long startCommits = getNumCommits((HttpSolrClient) clients.get(0)); - - ContentStreamUpdateRequest up = new ContentStreamUpdateRequest("/update"); - up.addFile(getFile("books_numeric_ids.csv"), "application/csv"); - up.setCommitWithin(900000); - up.setAction(AbstractUpdateRequest.ACTION.COMMIT, true, true); - NamedList result = clients.get(0).request(up); + + + NamedList result = clients.get(0).request( + new StreamingUpdateRequest("/update", + getFile("books_numeric_ids.csv"), "application/csv") + .setCommitWithin(900000) + .setAction(AbstractUpdateRequest.ACTION.COMMIT, true, true)); long endCommits = getNumCommits((HttpSolrClient) clients.get(0)); diff --git a/solr/core/src/test/org/apache/solr/cloud/SolrCloudExampleTest.java b/solr/core/src/test/org/apache/solr/cloud/SolrCloudExampleTest.java index 9a7ca695374..60714e2129c 100644 --- a/solr/core/src/test/org/apache/solr/cloud/SolrCloudExampleTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/SolrCloudExampleTest.java @@ -35,7 +35,7 @@ import org.apache.http.client.methods.HttpGet; import org.apache.http.util.EntityUtils; import org.apache.solr.client.solrj.SolrQuery; import org.apache.solr.client.solrj.impl.CloudSolrClient; -import org.apache.solr.client.solrj.request.ContentStreamUpdateRequest; +import org.apache.solr.client.solrj.request.StreamingUpdateRequest; import org.apache.solr.client.solrj.response.QueryResponse; import org.apache.solr.common.cloud.DocCollection; import org.apache.solr.common.cloud.Replica; @@ -141,10 +141,8 @@ public class SolrCloudExampleTest extends AbstractFullDistribZkTestBase { expectedXmlFileCount, xmlFiles.size()); for (File xml : xmlFiles) { - ContentStreamUpdateRequest req = new ContentStreamUpdateRequest("/update"); - req.addFile(xml, "application/xml"); log.info("POSTing "+xml.getAbsolutePath()); - cloudClient.request(req); + cloudClient.request(new StreamingUpdateRequest("/update",xml,"application/xml")); } cloudClient.commit(); diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/request/AbstractUpdateRequest.java b/solr/solrj/src/java/org/apache/solr/client/solrj/request/AbstractUpdateRequest.java index d1675fc3fa8..cb9e74e81c2 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/request/AbstractUpdateRequest.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/request/AbstractUpdateRequest.java @@ -135,8 +135,9 @@ public abstract class AbstractUpdateRequest extends SolrRequest return commitWithin; } - public void setCommitWithin(int commitWithin) { + public AbstractUpdateRequest setCommitWithin(int commitWithin) { this.commitWithin = commitWithin; + return this; } diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/request/StreamingUpdateRequest.java b/solr/solrj/src/java/org/apache/solr/client/solrj/request/StreamingUpdateRequest.java new file mode 100644 index 00000000000..381f318ab30 --- /dev/null +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/request/StreamingUpdateRequest.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.solr.client.solrj.request; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.charset.StandardCharsets; + +import org.apache.commons.io.IOUtils; + +/* A simple update request which streams content to the server + + */ +public class StreamingUpdateRequest extends AbstractUpdateRequest { + + private final RequestWriter.ContentWriter contentWriter; + + public StreamingUpdateRequest(String path, RequestWriter.ContentWriter contentWriter) { + super(METHOD.POST, path); + this.contentWriter = contentWriter; + } + + public StreamingUpdateRequest(String path, String content, String contentType) { + this(path, new RequestWriter.ContentWriter() { + @Override + public void write(OutputStream os) throws IOException { + os.write(content.getBytes(StandardCharsets.UTF_8)); + } + + @Override + public String getContentType() { + return contentType; + } + + }); + } + + public StreamingUpdateRequest(String path, File f, String contentType) { + this(path, new RequestWriter.ContentWriter() { + @Override + public void write(OutputStream os) throws IOException { + try (InputStream is = new FileInputStream(f)) { + IOUtils.copy(is, os); + } + } + + @Override + public String getContentType() { + return contentType; + } + }); + } + + @Override + public RequestWriter.ContentWriter getContentWriter(String expectedType) { + return contentWriter; + } + +} diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/SolrExampleTests.java b/solr/solrj/src/test/org/apache/solr/client/solrj/SolrExampleTests.java index 63bd72c6fa3..bf0d245d083 100644 --- a/solr/solrj/src/test/org/apache/solr/client/solrj/SolrExampleTests.java +++ b/solr/solrj/src/test/org/apache/solr/client/solrj/SolrExampleTests.java @@ -44,6 +44,7 @@ import org.apache.solr.client.solrj.request.AbstractUpdateRequest.ACTION; import org.apache.solr.client.solrj.request.ContentStreamUpdateRequest; import org.apache.solr.client.solrj.request.LukeRequest; import org.apache.solr.client.solrj.request.QueryRequest; +import org.apache.solr.client.solrj.request.StreamingUpdateRequest; import org.apache.solr.client.solrj.request.UpdateRequest; import org.apache.solr.client.solrj.response.FacetField; import org.apache.solr.client.solrj.response.FieldStatsInfo; @@ -655,6 +656,20 @@ abstract public class SolrExampleTests extends SolrExampleTestsBase rsp = client.query( new SolrQuery( "*:*") ); Assert.assertEquals( 10, rsp.getResults().getNumFound() ); } + @Test + public void testStreamingRequest() throws Exception { + SolrClient client = getSolrClient(); + client.deleteByQuery("*:*");// delete everything! + client.commit(); + QueryResponse rsp = client.query( new SolrQuery( "*:*") ); + Assert.assertEquals(0, rsp.getResults().getNumFound()); + NamedList result = client.request(new StreamingUpdateRequest("/update", + getFile("solrj/books.csv"), "application/csv") + .setAction(AbstractUpdateRequest.ACTION.COMMIT, true, true)); + assertNotNull("Couldn't upload books.csv", result); + rsp = client.query( new SolrQuery( "*:*") ); + Assert.assertEquals( 10, rsp.getResults().getNumFound() ); + } @Test public void testMultiContentStreamRequest() throws Exception {