diff --git a/CHANGES.txt b/CHANGES.txt index de1f4eb6a2b..e5b816fca75 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -132,6 +132,12 @@ New Features For example, hl.fl=*_text will highlight all fieldnames ending with _text. (Lars Kotthoff via yonik) +29. SOLR-906: Adding a StreamingUpdateSolrServer that writes update commands to + an open HTTP connection. If you are using solrj for bulk update requests + you should consider switching to this implementaion. However, note that + the error handling is not immediate as it is with the standard SolrServer. + (ryan) + Optimizations ---------------------- diff --git a/src/solrj/org/apache/solr/client/solrj/impl/StreamingUpdateSolrServer.java b/src/solrj/org/apache/solr/client/solrj/impl/StreamingUpdateSolrServer.java new file mode 100644 index 00000000000..d56c9e2bed2 --- /dev/null +++ b/src/solrj/org/apache/solr/client/solrj/impl/StreamingUpdateSolrServer.java @@ -0,0 +1,252 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.solr.client.solrj.impl; + +import java.io.IOException; +import java.io.OutputStream; +import java.io.OutputStreamWriter; +import java.net.MalformedURLException; +import java.util.LinkedList; +import java.util.Queue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import org.apache.commons.httpclient.HttpStatus; +import org.apache.commons.httpclient.methods.PostMethod; +import org.apache.commons.httpclient.methods.RequestEntity; +import org.apache.solr.client.solrj.SolrRequest; +import org.apache.solr.client.solrj.SolrServerException; +import org.apache.solr.client.solrj.request.UpdateRequest; +import org.apache.solr.client.solrj.util.ClientUtils; +import org.apache.solr.common.params.SolrParams; +import org.apache.solr.common.params.UpdateParams; +import org.apache.solr.common.util.NamedList; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * StreamingHttpSolrServer buffers all added documents and writes them + * into open http connections. This class is thread safe. + * + * Although any SolrServer request can be made with this implementation, + * it is only recommended to use the {@link StreamingUpdateSolrServer} with + * /update requests. The query interface is better suited for + * + * @version $Id: CommonsHttpSolrServer.java 724175 2008-12-07 19:07:11Z ryan $ + * @since solr 1.4 + */ +public class StreamingUpdateSolrServer extends CommonsHttpSolrServer +{ + static final Logger log = LoggerFactory.getLogger( StreamingUpdateSolrServer.class ); + + final BlockingQueue queue; + final ExecutorService scheduler = Executors.newCachedThreadPool(); + final String updateUrl = "/update"; + final Queue runners; + Lock lock = null; // used to block everything + int threadCount = 1; + + public StreamingUpdateSolrServer(String solrServerUrl, int queueSize, int threadCount ) throws MalformedURLException { + super( solrServerUrl ); + queue = new LinkedBlockingQueue( queueSize ); + this.threadCount = threadCount; + runners = new LinkedList(); + } + + /** + * Opens a connection and sends everything... + */ + class Runner implements Runnable { + final Lock lock = new ReentrantLock(); + + public void run() { + lock.lock(); + + log.info( "starting runner: {}" , this ); + PostMethod method = null; + try { + RequestEntity request = new RequestEntity() { + // we don't know the length + public long getContentLength() { return -1; } + public String getContentType() { return ClientUtils.TEXT_XML; } + public boolean isRepeatable() { return false; } + + public void writeRequest(OutputStream out) throws IOException { + try { + OutputStreamWriter writer = new OutputStreamWriter( out ); + writer.append( "" ); // can be anything... + UpdateRequest req = queue.poll( 250, TimeUnit.MILLISECONDS ); + while( req != null ) { + log.info( "sending: {}" , req ); + req.writeXML( writer ); + + // check for commit or optimize + SolrParams params = req.getParams(); + if( params != null ) { + String fmt = null; + if( params.getBool( UpdateParams.OPTIMIZE, false ) ) { + fmt = ""; + } + else if( params.getBool( UpdateParams.COMMIT, false ) ) { + fmt = ""; + } + if( fmt != null ) { + log.info( fmt ); + writer.write( String.format( fmt, + params.getBool( UpdateParams.WAIT_SEARCHER, false )+"", + params.getBool( UpdateParams.WAIT_FLUSH, false )+"") ); + } + } + + writer.flush(); + req = queue.poll( 250, TimeUnit.MILLISECONDS ); + } + writer.append( "" ); + writer.flush(); + } + catch (InterruptedException e) { + e.printStackTrace(); + } + } + }; + + method = new PostMethod(_baseURL+updateUrl ); + method.setRequestEntity( request ); + method.setFollowRedirects( false ); + method.addRequestHeader( "User-Agent", AGENT ); + + int statusCode = getHttpClient().executeMethod(method); + if (statusCode != HttpStatus.SC_OK) { + StringBuilder msg = new StringBuilder(); + msg.append( method.getStatusLine().getReasonPhrase() ); + msg.append( "\n\n" ); + msg.append( method.getStatusText() ); + msg.append( "\n\n" ); + msg.append( "request: "+method.getURI() ); + handleError( new Exception( msg.toString() ) ); + } + } + catch (Throwable e) { + handleError( e ); + } + finally { + try { + // make sure to release the connection + method.releaseConnection(); + } + catch( Exception ex ){} + if( !queue.isEmpty() ) { + run(); // run again, just in case + } + + // remove it from the list of running things... + synchronized (runners) { + runners.remove( this ); + } + log.info( "finished: {}" , this ); + lock.unlock(); + } + } + } + + @Override + public NamedList request( final SolrRequest request ) throws SolrServerException, IOException + { + if( !(request instanceof UpdateRequest) ) { + return super.request( request ); + } + UpdateRequest req = (UpdateRequest)request; + + // this happens for commit... + if( req.getDocuments()==null || req.getDocuments().isEmpty() ) { + blockUntilFinished(); + return super.request( request ); + } + + SolrParams params = req.getParams(); + if( params != null ) { + // check if it is waiting for the searcher + if( params.getBool( UpdateParams.WAIT_SEARCHER, false ) ) { + log.info( "blocking for commit/optimize" ); + blockUntilFinished(); // empty the queue + return super.request( request ); + } + } + + + if( lock != null ) { + lock.lock(); // keep it from adding new commands while we block + } + try { + queue.put( req ); + + if( runners.isEmpty() + || (queue.remainingCapacity() < queue.size() + && runners.size() < threadCount) ) + { + synchronized( runners ) { + Runner r = new Runner(); + scheduler.execute( r ); + runners.add( r ); + } + } + } + catch (InterruptedException e) { + log.error( "interuped", e ); + throw new IOException( e.getLocalizedMessage() ); + } + finally { + if( lock != null ) { + lock.unlock(); + } + } + + // RETURN A DUMMY result + NamedList dummy = new NamedList(); + dummy.add( "NOTE", "the request is processed in a background stream" ); + return dummy; + } + + public synchronized void blockUntilFinished() + { + if( lock == null ) { + lock = new ReentrantLock(); + } + lock.lock(); + + // Wait until no runners are running + Runner runner = runners.peek(); + while( runner != null ) { + runner.lock.lock(); + runner.lock.unlock(); + runner = runners.peek(); + } + lock.unlock(); + lock = null; + } + + public void handleError( Throwable ex ) + { + log.error( "error", ex ); + } +} diff --git a/src/solrj/org/apache/solr/client/solrj/request/UpdateRequest.java b/src/solrj/org/apache/solr/client/solrj/request/UpdateRequest.java index b55a3db0dbd..d82b05ec23f 100644 --- a/src/solrj/org/apache/solr/client/solrj/request/UpdateRequest.java +++ b/src/solrj/org/apache/solr/client/solrj/request/UpdateRequest.java @@ -19,6 +19,7 @@ package org.apache.solr.client.solrj.request; import java.io.IOException; import java.io.StringWriter; +import java.io.Writer; import java.util.ArrayList; import java.util.Collection; import java.util.List; @@ -171,9 +172,22 @@ public class UpdateRequest extends SolrRequest public Collection getContentStreams() throws IOException { return ClientUtils.toContentStreams( getXML(), ClientUtils.TEXT_XML ); } - + public String getXML() throws IOException { StringWriter writer = new StringWriter(); + writeXML( writer ); + writer.flush(); + + // If action is COMMIT or OPTIMIZE, it is sent with params + String xml = writer.toString(); + //System.out.println( "SEND:"+xml ); + return (xml.length() > 0) ? xml : null; + } + + /** + * @since solr 1.4 + */ + public void writeXML( Writer writer ) throws IOException { if( documents != null && documents.size() > 0 ) { if( commitWithin > 0 ) { writer.write(""); @@ -210,11 +224,6 @@ public class UpdateRequest extends SolrRequest } writer.append( "" ); } - - // If action is COMMIT or OPTIMIZE, it is sent with params - String xml = writer.toString(); - //System.out.println( "SEND:"+xml ); - return (xml.length() > 0) ? xml : null; } diff --git a/src/test/org/apache/solr/client/solrj/embedded/SolrExampleStreamingTest.java b/src/test/org/apache/solr/client/solrj/embedded/SolrExampleStreamingTest.java new file mode 100644 index 00000000000..0820d7b9060 --- /dev/null +++ b/src/test/org/apache/solr/client/solrj/embedded/SolrExampleStreamingTest.java @@ -0,0 +1,84 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.solr.client.solrj.embedded; + +import org.apache.solr.client.solrj.SolrExampleTests; +import org.apache.solr.client.solrj.SolrServer; +import org.apache.solr.client.solrj.impl.CommonsHttpSolrServer; +import org.apache.solr.client.solrj.impl.StreamingUpdateSolrServer; + + +/** + * + * @version $Id: SolrExampleJettyTest.java 724175 2008-12-07 19:07:11Z ryan $ + * @since solr 1.3 + */ +public class SolrExampleStreamingTest extends SolrExampleTests { + + SolrServer server; + JettySolrRunner jetty; + + int port = 0; + static final String context = "/example"; + + @Override public void setUp() throws Exception + { + super.setUp(); + + jetty = new JettySolrRunner( context, 0 ); + jetty.start(); + port = jetty.getLocalPort(); + System.out.println("Assigned Port#" + port); + server = this.createNewSolrServer(); + } + + @Override public void tearDown() throws Exception + { + super.tearDown(); + jetty.stop(); // stop the server + } + + + @Override + protected SolrServer getSolrServer() + { + return server; + } + + @Override + protected SolrServer createNewSolrServer() + { + try { + // setup the server... + String url = "http://localhost:"+port+context; // smaller queue size hits locks more often + CommonsHttpSolrServer s = new StreamingUpdateSolrServer( url, 2, 5 ) { + @Override + public void handleError(Throwable ex) { + // do somethign... + } + }; + s.setConnectionTimeout(100); // 1/10th sec + s.setDefaultMaxConnectionsPerHost(100); + s.setMaxTotalConnections(100); + return s; + } + catch( Exception ex ) { + throw new RuntimeException( ex ); + } + } +}