mirror of https://github.com/apache/lucene.git
SOLR-906 -- adding streaming update SolrServer
git-svn-id: https://svn.apache.org/repos/asf/lucene/solr/trunk@732087 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
8e25602e24
commit
cad808454e
|
@ -132,6 +132,12 @@ New Features
|
|||
For example, hl.fl=*_text will highlight all fieldnames ending with
|
||||
_text. (Lars Kotthoff via yonik)
|
||||
|
||||
29. SOLR-906: Adding a StreamingUpdateSolrServer that writes update commands to
|
||||
an open HTTP connection. If you are using solrj for bulk update requests
|
||||
you should consider switching to this implementaion. However, note that
|
||||
the error handling is not immediate as it is with the standard SolrServer.
|
||||
(ryan)
|
||||
|
||||
|
||||
Optimizations
|
||||
----------------------
|
||||
|
|
|
@ -0,0 +1,252 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.solr.client.solrj.impl;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
import java.io.OutputStreamWriter;
|
||||
import java.net.MalformedURLException;
|
||||
import java.util.LinkedList;
|
||||
import java.util.Queue;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.locks.Lock;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
|
||||
import org.apache.commons.httpclient.HttpStatus;
|
||||
import org.apache.commons.httpclient.methods.PostMethod;
|
||||
import org.apache.commons.httpclient.methods.RequestEntity;
|
||||
import org.apache.solr.client.solrj.SolrRequest;
|
||||
import org.apache.solr.client.solrj.SolrServerException;
|
||||
import org.apache.solr.client.solrj.request.UpdateRequest;
|
||||
import org.apache.solr.client.solrj.util.ClientUtils;
|
||||
import org.apache.solr.common.params.SolrParams;
|
||||
import org.apache.solr.common.params.UpdateParams;
|
||||
import org.apache.solr.common.util.NamedList;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* StreamingHttpSolrServer buffers all added documents and writes them
|
||||
* into open http connections. This class is thread safe.
|
||||
*
|
||||
* Although any SolrServer request can be made with this implementation,
|
||||
* it is only recommended to use the {@link StreamingUpdateSolrServer} with
|
||||
* /update requests. The query interface is better suited for
|
||||
*
|
||||
* @version $Id: CommonsHttpSolrServer.java 724175 2008-12-07 19:07:11Z ryan $
|
||||
* @since solr 1.4
|
||||
*/
|
||||
public class StreamingUpdateSolrServer extends CommonsHttpSolrServer
|
||||
{
|
||||
static final Logger log = LoggerFactory.getLogger( StreamingUpdateSolrServer.class );
|
||||
|
||||
final BlockingQueue<UpdateRequest> queue;
|
||||
final ExecutorService scheduler = Executors.newCachedThreadPool();
|
||||
final String updateUrl = "/update";
|
||||
final Queue<Runner> runners;
|
||||
Lock lock = null; // used to block everything
|
||||
int threadCount = 1;
|
||||
|
||||
public StreamingUpdateSolrServer(String solrServerUrl, int queueSize, int threadCount ) throws MalformedURLException {
|
||||
super( solrServerUrl );
|
||||
queue = new LinkedBlockingQueue<UpdateRequest>( queueSize );
|
||||
this.threadCount = threadCount;
|
||||
runners = new LinkedList<Runner>();
|
||||
}
|
||||
|
||||
/**
|
||||
* Opens a connection and sends everything...
|
||||
*/
|
||||
class Runner implements Runnable {
|
||||
final Lock lock = new ReentrantLock();
|
||||
|
||||
public void run() {
|
||||
lock.lock();
|
||||
|
||||
log.info( "starting runner: {}" , this );
|
||||
PostMethod method = null;
|
||||
try {
|
||||
RequestEntity request = new RequestEntity() {
|
||||
// we don't know the length
|
||||
public long getContentLength() { return -1; }
|
||||
public String getContentType() { return ClientUtils.TEXT_XML; }
|
||||
public boolean isRepeatable() { return false; }
|
||||
|
||||
public void writeRequest(OutputStream out) throws IOException {
|
||||
try {
|
||||
OutputStreamWriter writer = new OutputStreamWriter( out );
|
||||
writer.append( "<stream>" ); // can be anything...
|
||||
UpdateRequest req = queue.poll( 250, TimeUnit.MILLISECONDS );
|
||||
while( req != null ) {
|
||||
log.info( "sending: {}" , req );
|
||||
req.writeXML( writer );
|
||||
|
||||
// check for commit or optimize
|
||||
SolrParams params = req.getParams();
|
||||
if( params != null ) {
|
||||
String fmt = null;
|
||||
if( params.getBool( UpdateParams.OPTIMIZE, false ) ) {
|
||||
fmt = "<optimize waitSearcher=\"%s\" waitFlush=\"%s\" />";
|
||||
}
|
||||
else if( params.getBool( UpdateParams.COMMIT, false ) ) {
|
||||
fmt = "<commit waitSearcher=\"%s\" waitFlush=\"%s\" />";
|
||||
}
|
||||
if( fmt != null ) {
|
||||
log.info( fmt );
|
||||
writer.write( String.format( fmt,
|
||||
params.getBool( UpdateParams.WAIT_SEARCHER, false )+"",
|
||||
params.getBool( UpdateParams.WAIT_FLUSH, false )+"") );
|
||||
}
|
||||
}
|
||||
|
||||
writer.flush();
|
||||
req = queue.poll( 250, TimeUnit.MILLISECONDS );
|
||||
}
|
||||
writer.append( "</stream>" );
|
||||
writer.flush();
|
||||
}
|
||||
catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
method = new PostMethod(_baseURL+updateUrl );
|
||||
method.setRequestEntity( request );
|
||||
method.setFollowRedirects( false );
|
||||
method.addRequestHeader( "User-Agent", AGENT );
|
||||
|
||||
int statusCode = getHttpClient().executeMethod(method);
|
||||
if (statusCode != HttpStatus.SC_OK) {
|
||||
StringBuilder msg = new StringBuilder();
|
||||
msg.append( method.getStatusLine().getReasonPhrase() );
|
||||
msg.append( "\n\n" );
|
||||
msg.append( method.getStatusText() );
|
||||
msg.append( "\n\n" );
|
||||
msg.append( "request: "+method.getURI() );
|
||||
handleError( new Exception( msg.toString() ) );
|
||||
}
|
||||
}
|
||||
catch (Throwable e) {
|
||||
handleError( e );
|
||||
}
|
||||
finally {
|
||||
try {
|
||||
// make sure to release the connection
|
||||
method.releaseConnection();
|
||||
}
|
||||
catch( Exception ex ){}
|
||||
if( !queue.isEmpty() ) {
|
||||
run(); // run again, just in case
|
||||
}
|
||||
|
||||
// remove it from the list of running things...
|
||||
synchronized (runners) {
|
||||
runners.remove( this );
|
||||
}
|
||||
log.info( "finished: {}" , this );
|
||||
lock.unlock();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public NamedList<Object> request( final SolrRequest request ) throws SolrServerException, IOException
|
||||
{
|
||||
if( !(request instanceof UpdateRequest) ) {
|
||||
return super.request( request );
|
||||
}
|
||||
UpdateRequest req = (UpdateRequest)request;
|
||||
|
||||
// this happens for commit...
|
||||
if( req.getDocuments()==null || req.getDocuments().isEmpty() ) {
|
||||
blockUntilFinished();
|
||||
return super.request( request );
|
||||
}
|
||||
|
||||
SolrParams params = req.getParams();
|
||||
if( params != null ) {
|
||||
// check if it is waiting for the searcher
|
||||
if( params.getBool( UpdateParams.WAIT_SEARCHER, false ) ) {
|
||||
log.info( "blocking for commit/optimize" );
|
||||
blockUntilFinished(); // empty the queue
|
||||
return super.request( request );
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
if( lock != null ) {
|
||||
lock.lock(); // keep it from adding new commands while we block
|
||||
}
|
||||
try {
|
||||
queue.put( req );
|
||||
|
||||
if( runners.isEmpty()
|
||||
|| (queue.remainingCapacity() < queue.size()
|
||||
&& runners.size() < threadCount) )
|
||||
{
|
||||
synchronized( runners ) {
|
||||
Runner r = new Runner();
|
||||
scheduler.execute( r );
|
||||
runners.add( r );
|
||||
}
|
||||
}
|
||||
}
|
||||
catch (InterruptedException e) {
|
||||
log.error( "interuped", e );
|
||||
throw new IOException( e.getLocalizedMessage() );
|
||||
}
|
||||
finally {
|
||||
if( lock != null ) {
|
||||
lock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
// RETURN A DUMMY result
|
||||
NamedList<Object> dummy = new NamedList<Object>();
|
||||
dummy.add( "NOTE", "the request is processed in a background stream" );
|
||||
return dummy;
|
||||
}
|
||||
|
||||
public synchronized void blockUntilFinished()
|
||||
{
|
||||
if( lock == null ) {
|
||||
lock = new ReentrantLock();
|
||||
}
|
||||
lock.lock();
|
||||
|
||||
// Wait until no runners are running
|
||||
Runner runner = runners.peek();
|
||||
while( runner != null ) {
|
||||
runner.lock.lock();
|
||||
runner.lock.unlock();
|
||||
runner = runners.peek();
|
||||
}
|
||||
lock.unlock();
|
||||
lock = null;
|
||||
}
|
||||
|
||||
public void handleError( Throwable ex )
|
||||
{
|
||||
log.error( "error", ex );
|
||||
}
|
||||
}
|
|
@ -19,6 +19,7 @@ package org.apache.solr.client.solrj.request;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.io.StringWriter;
|
||||
import java.io.Writer;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
|
@ -171,9 +172,22 @@ public class UpdateRequest extends SolrRequest
|
|||
public Collection<ContentStream> getContentStreams() throws IOException {
|
||||
return ClientUtils.toContentStreams( getXML(), ClientUtils.TEXT_XML );
|
||||
}
|
||||
|
||||
|
||||
public String getXML() throws IOException {
|
||||
StringWriter writer = new StringWriter();
|
||||
writeXML( writer );
|
||||
writer.flush();
|
||||
|
||||
// If action is COMMIT or OPTIMIZE, it is sent with params
|
||||
String xml = writer.toString();
|
||||
//System.out.println( "SEND:"+xml );
|
||||
return (xml.length() > 0) ? xml : null;
|
||||
}
|
||||
|
||||
/**
|
||||
* @since solr 1.4
|
||||
*/
|
||||
public void writeXML( Writer writer ) throws IOException {
|
||||
if( documents != null && documents.size() > 0 ) {
|
||||
if( commitWithin > 0 ) {
|
||||
writer.write("<add commitWithin=\""+commitWithin+"\">");
|
||||
|
@ -210,11 +224,6 @@ public class UpdateRequest extends SolrRequest
|
|||
}
|
||||
writer.append( "</delete>" );
|
||||
}
|
||||
|
||||
// If action is COMMIT or OPTIMIZE, it is sent with params
|
||||
String xml = writer.toString();
|
||||
//System.out.println( "SEND:"+xml );
|
||||
return (xml.length() > 0) ? xml : null;
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -0,0 +1,84 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.solr.client.solrj.embedded;
|
||||
|
||||
import org.apache.solr.client.solrj.SolrExampleTests;
|
||||
import org.apache.solr.client.solrj.SolrServer;
|
||||
import org.apache.solr.client.solrj.impl.CommonsHttpSolrServer;
|
||||
import org.apache.solr.client.solrj.impl.StreamingUpdateSolrServer;
|
||||
|
||||
|
||||
/**
|
||||
*
|
||||
* @version $Id: SolrExampleJettyTest.java 724175 2008-12-07 19:07:11Z ryan $
|
||||
* @since solr 1.3
|
||||
*/
|
||||
public class SolrExampleStreamingTest extends SolrExampleTests {
|
||||
|
||||
SolrServer server;
|
||||
JettySolrRunner jetty;
|
||||
|
||||
int port = 0;
|
||||
static final String context = "/example";
|
||||
|
||||
@Override public void setUp() throws Exception
|
||||
{
|
||||
super.setUp();
|
||||
|
||||
jetty = new JettySolrRunner( context, 0 );
|
||||
jetty.start();
|
||||
port = jetty.getLocalPort();
|
||||
System.out.println("Assigned Port#" + port);
|
||||
server = this.createNewSolrServer();
|
||||
}
|
||||
|
||||
@Override public void tearDown() throws Exception
|
||||
{
|
||||
super.tearDown();
|
||||
jetty.stop(); // stop the server
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
protected SolrServer getSolrServer()
|
||||
{
|
||||
return server;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected SolrServer createNewSolrServer()
|
||||
{
|
||||
try {
|
||||
// setup the server...
|
||||
String url = "http://localhost:"+port+context; // smaller queue size hits locks more often
|
||||
CommonsHttpSolrServer s = new StreamingUpdateSolrServer( url, 2, 5 ) {
|
||||
@Override
|
||||
public void handleError(Throwable ex) {
|
||||
// do somethign...
|
||||
}
|
||||
};
|
||||
s.setConnectionTimeout(100); // 1/10th sec
|
||||
s.setDefaultMaxConnectionsPerHost(100);
|
||||
s.setMaxTotalConnections(100);
|
||||
return s;
|
||||
}
|
||||
catch( Exception ex ) {
|
||||
throw new RuntimeException( ex );
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue