mirror of https://github.com/apache/lucene.git
SOLR-12474: Add an UpdateRequest Object that implements RequestWriter.ContentWriter
This commit is contained in:
parent
b7e9fb43d0
commit
0242409fae
|
@ -72,6 +72,9 @@ New Features
|
|||
be subject to whatever the sort criteria is. Additionally, QEC was extensively refactored to be more extensible.
|
||||
(Bruno Roustant, David Smiley)
|
||||
|
||||
* SOLR-12474: Add an UpdateRequest Object that implements RequestWriter.ContentWriter (noble)
|
||||
|
||||
|
||||
Bug Fixes
|
||||
----------------------
|
||||
|
||||
|
|
|
@ -16,6 +16,25 @@
|
|||
*/
|
||||
package org.apache.solr.cloud;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.lang.invoke.MethodHandles;
|
||||
import java.net.URL;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.CompletionService;
|
||||
import java.util.concurrent.ExecutorCompletionService;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.SynchronousQueue;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
import org.apache.lucene.util.IOUtils;
|
||||
import org.apache.lucene.util.LuceneTestCase.Slow;
|
||||
|
@ -29,10 +48,10 @@ import org.apache.solr.client.solrj.embedded.JettySolrRunner;
|
|||
import org.apache.solr.client.solrj.impl.HttpSolrClient;
|
||||
import org.apache.solr.client.solrj.request.AbstractUpdateRequest;
|
||||
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
|
||||
import org.apache.solr.client.solrj.request.ContentStreamUpdateRequest;
|
||||
import org.apache.solr.client.solrj.request.CoreAdminRequest.Create;
|
||||
import org.apache.solr.client.solrj.request.CoreAdminRequest.Unload;
|
||||
import org.apache.solr.client.solrj.request.QueryRequest;
|
||||
import org.apache.solr.client.solrj.request.StreamingUpdateRequest;
|
||||
import org.apache.solr.client.solrj.request.UpdateRequest;
|
||||
import org.apache.solr.client.solrj.response.CollectionAdminResponse;
|
||||
import org.apache.solr.client.solrj.response.QueryResponse;
|
||||
|
@ -60,25 +79,6 @@ import org.junit.Test;
|
|||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.lang.invoke.MethodHandles;
|
||||
import java.net.URL;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.CompletionService;
|
||||
import java.util.concurrent.ExecutorCompletionService;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.SynchronousQueue;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
|
||||
/**
|
||||
* This test simply does a bunch of basic things in solrcloud mode and asserts things
|
||||
|
@ -750,12 +750,13 @@ public class BasicDistributedZkTest extends AbstractFullDistribZkTestBase {
|
|||
throws SolrServerException, IOException {
|
||||
log.info("### STARTING testNumberOfCommitsWithCommitAfterAdd");
|
||||
long startCommits = getNumCommits((HttpSolrClient) clients.get(0));
|
||||
|
||||
ContentStreamUpdateRequest up = new ContentStreamUpdateRequest("/update");
|
||||
up.addFile(getFile("books_numeric_ids.csv"), "application/csv");
|
||||
up.setCommitWithin(900000);
|
||||
up.setAction(AbstractUpdateRequest.ACTION.COMMIT, true, true);
|
||||
NamedList<Object> result = clients.get(0).request(up);
|
||||
|
||||
|
||||
NamedList<Object> result = clients.get(0).request(
|
||||
new StreamingUpdateRequest("/update",
|
||||
getFile("books_numeric_ids.csv"), "application/csv")
|
||||
.setCommitWithin(900000)
|
||||
.setAction(AbstractUpdateRequest.ACTION.COMMIT, true, true));
|
||||
|
||||
long endCommits = getNumCommits((HttpSolrClient) clients.get(0));
|
||||
|
||||
|
|
|
@ -35,7 +35,7 @@ import org.apache.http.client.methods.HttpGet;
|
|||
import org.apache.http.util.EntityUtils;
|
||||
import org.apache.solr.client.solrj.SolrQuery;
|
||||
import org.apache.solr.client.solrj.impl.CloudSolrClient;
|
||||
import org.apache.solr.client.solrj.request.ContentStreamUpdateRequest;
|
||||
import org.apache.solr.client.solrj.request.StreamingUpdateRequest;
|
||||
import org.apache.solr.client.solrj.response.QueryResponse;
|
||||
import org.apache.solr.common.cloud.DocCollection;
|
||||
import org.apache.solr.common.cloud.Replica;
|
||||
|
@ -141,10 +141,8 @@ public class SolrCloudExampleTest extends AbstractFullDistribZkTestBase {
|
|||
expectedXmlFileCount, xmlFiles.size());
|
||||
|
||||
for (File xml : xmlFiles) {
|
||||
ContentStreamUpdateRequest req = new ContentStreamUpdateRequest("/update");
|
||||
req.addFile(xml, "application/xml");
|
||||
log.info("POSTing "+xml.getAbsolutePath());
|
||||
cloudClient.request(req);
|
||||
cloudClient.request(new StreamingUpdateRequest("/update",xml,"application/xml"));
|
||||
}
|
||||
cloudClient.commit();
|
||||
|
||||
|
|
|
@ -135,8 +135,9 @@ public abstract class AbstractUpdateRequest extends SolrRequest<UpdateResponse>
|
|||
return commitWithin;
|
||||
}
|
||||
|
||||
public void setCommitWithin(int commitWithin) {
|
||||
public AbstractUpdateRequest setCommitWithin(int commitWithin) {
|
||||
this.commitWithin = commitWithin;
|
||||
return this;
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -0,0 +1,77 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.solr.client.solrj.request;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.FileInputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
|
||||
import org.apache.commons.io.IOUtils;
|
||||
|
||||
/* A simple update request which streams content to the server
|
||||
|
||||
*/
|
||||
public class StreamingUpdateRequest extends AbstractUpdateRequest {
|
||||
|
||||
private final RequestWriter.ContentWriter contentWriter;
|
||||
|
||||
public StreamingUpdateRequest(String path, RequestWriter.ContentWriter contentWriter) {
|
||||
super(METHOD.POST, path);
|
||||
this.contentWriter = contentWriter;
|
||||
}
|
||||
|
||||
public StreamingUpdateRequest(String path, String content, String contentType) {
|
||||
this(path, new RequestWriter.ContentWriter() {
|
||||
@Override
|
||||
public void write(OutputStream os) throws IOException {
|
||||
os.write(content.getBytes(StandardCharsets.UTF_8));
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getContentType() {
|
||||
return contentType;
|
||||
}
|
||||
|
||||
});
|
||||
}
|
||||
|
||||
public StreamingUpdateRequest(String path, File f, String contentType) {
|
||||
this(path, new RequestWriter.ContentWriter() {
|
||||
@Override
|
||||
public void write(OutputStream os) throws IOException {
|
||||
try (InputStream is = new FileInputStream(f)) {
|
||||
IOUtils.copy(is, os);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getContentType() {
|
||||
return contentType;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public RequestWriter.ContentWriter getContentWriter(String expectedType) {
|
||||
return contentWriter;
|
||||
}
|
||||
|
||||
}
|
|
@ -44,6 +44,7 @@ import org.apache.solr.client.solrj.request.AbstractUpdateRequest.ACTION;
|
|||
import org.apache.solr.client.solrj.request.ContentStreamUpdateRequest;
|
||||
import org.apache.solr.client.solrj.request.LukeRequest;
|
||||
import org.apache.solr.client.solrj.request.QueryRequest;
|
||||
import org.apache.solr.client.solrj.request.StreamingUpdateRequest;
|
||||
import org.apache.solr.client.solrj.request.UpdateRequest;
|
||||
import org.apache.solr.client.solrj.response.FacetField;
|
||||
import org.apache.solr.client.solrj.response.FieldStatsInfo;
|
||||
|
@ -655,6 +656,20 @@ abstract public class SolrExampleTests extends SolrExampleTestsBase
|
|||
rsp = client.query( new SolrQuery( "*:*") );
|
||||
Assert.assertEquals( 10, rsp.getResults().getNumFound() );
|
||||
}
|
||||
@Test
|
||||
public void testStreamingRequest() throws Exception {
|
||||
SolrClient client = getSolrClient();
|
||||
client.deleteByQuery("*:*");// delete everything!
|
||||
client.commit();
|
||||
QueryResponse rsp = client.query( new SolrQuery( "*:*") );
|
||||
Assert.assertEquals(0, rsp.getResults().getNumFound());
|
||||
NamedList<Object> result = client.request(new StreamingUpdateRequest("/update",
|
||||
getFile("solrj/books.csv"), "application/csv")
|
||||
.setAction(AbstractUpdateRequest.ACTION.COMMIT, true, true));
|
||||
assertNotNull("Couldn't upload books.csv", result);
|
||||
rsp = client.query( new SolrQuery( "*:*") );
|
||||
Assert.assertEquals( 10, rsp.getResults().getNumFound() );
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMultiContentStreamRequest() throws Exception {
|
||||
|
|
Loading…
Reference in New Issue