SOLR-1565 -- StreamingUpdateSolrServer supports RequestWriter API and therefore, javabin update format

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1205342 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Shalin Shekhar Mangar 2011-11-23 09:39:54 +00:00
parent 28c464c658
commit db391dfc79
3 changed files with 89 additions and 38 deletions

View File

@ -380,6 +380,9 @@ New Features
* SOLR-2904: BinaryUpdateRequestHandler should be able to accept multiple update requests from
a stream (shalin)
* SOLR-1565: StreamingUpdateSolrServer supports RequestWriter API and therefore, javabin update
format (shalin)
================== 3.5.0 ==================
New Features

View File

@ -19,10 +19,8 @@ package org.apache.solr.client.solrj.impl;
import java.io.IOException;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.net.MalformedURLException;
import java.util.LinkedList;
import java.util.Queue;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@ -33,10 +31,13 @@ import org.apache.commons.httpclient.methods.PostMethod;
import org.apache.commons.httpclient.methods.RequestEntity;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.request.RequestWriter;
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.client.solrj.util.ClientUtils;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.params.UpdateParams;
import org.apache.solr.common.util.ContentStream;
import org.apache.solr.common.util.NamedList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -99,65 +100,70 @@ public class StreamingUpdateSolrServer extends CommonsHttpSolrServer
log.info( "starting runner: {}" , this );
PostMethod method = null;
try {
do {
while (!queue.isEmpty()) {
try {
final UpdateRequest updateRequest = queue.poll(250, TimeUnit.MILLISECONDS);
if (updateRequest == null) break;
RequestEntity request = new RequestEntity() {
// we don't know the length
public long getContentLength() { return -1; }
public String getContentType() { return ClientUtils.TEXT_XML; }
public String getContentType() { return requestWriter.getUpdateContentType(); }
public boolean isRepeatable() { return false; }
public void writeRequest(OutputStream out) throws IOException {
try {
OutputStreamWriter writer = new OutputStreamWriter(out, "UTF-8");
writer.append( "<stream>" ); // can be anything...
UpdateRequest req = queue.poll( 250, TimeUnit.MILLISECONDS );
while( req != null ) {
log.debug( "sending: {}" , req );
req.writeXML( writer );
// check for commit or optimize
SolrParams params = req.getParams();
if( params != null ) {
String fmt = null;
if( params.getBool( UpdateParams.OPTIMIZE, false ) ) {
fmt = "<optimize waitSearcher=\"%s\" waitFlush=\"%s\" />";
}
else if( params.getBool( UpdateParams.COMMIT, false ) ) {
fmt = "<commit waitSearcher=\"%s\" waitFlush=\"%s\" />";
}
if( fmt != null ) {
log.info( fmt );
writer.write( String.format( fmt,
params.getBool( UpdateParams.WAIT_SEARCHER, false )+"") );
if (ClientUtils.TEXT_XML.equals(requestWriter.getUpdateContentType())) {
out.write("<stream>".getBytes("UTF-8")); // can be anything
}
UpdateRequest req = updateRequest;
while (req != null) {
requestWriter.write(req, out);
if (ClientUtils.TEXT_XML.equals(requestWriter.getUpdateContentType())) {
// check for commit or optimize
SolrParams params = req.getParams();
if (params != null) {
String fmt = null;
if (params.getBool(UpdateParams.OPTIMIZE, false)) {
fmt = "<optimize waitSearcher=\"%s\" waitFlush=\"%s\" />";
} else if (params.getBool(UpdateParams.COMMIT, false)) {
fmt = "<commit waitSearcher=\"%s\" waitFlush=\"%s\" />";
}
if (fmt != null) {
byte[] content = String.format(fmt,
params.getBool(UpdateParams.WAIT_SEARCHER, false) + "").getBytes("UTF-8");
out.write(content);
}
}
}
writer.flush();
req = queue.poll( 250, TimeUnit.MILLISECONDS );
out.flush();
req = queue.poll(250, TimeUnit.MILLISECONDS);
}
writer.append( "</stream>" );
writer.flush();
}
catch (InterruptedException e) {
if (ClientUtils.TEXT_XML.equals(requestWriter.getUpdateContentType())) {
out.write("</stream>".getBytes("UTF-8"));
}
out.flush();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
method = new PostMethod(_baseURL+updateUrl );
String path = ClientUtils.TEXT_XML.equals(requestWriter.getUpdateContentType()) ? "/update" : "/update/javabin";
method = new PostMethod(_baseURL+path );
method.setRequestEntity( request );
method.setFollowRedirects( false );
method.addRequestHeader( "User-Agent", AGENT );
int statusCode = getHttpClient().executeMethod(method);
log.info("Status for: " + updateRequest.getDocuments().get(0).getFieldValue("id") + " is " + statusCode);
if (statusCode != HttpStatus.SC_OK) {
StringBuilder msg = new StringBuilder();
msg.append( method.getStatusLine().getReasonPhrase() );
msg.append( "\n\n" );
msg.append( method.getStatusText() );
msg.append( "\n\n" );
msg.append( "request: "+method.getURI() );
msg.append("request: ").append(method.getURI());
handleError( new Exception( msg.toString() ) );
}
} finally {
@ -168,7 +174,7 @@ public class StreamingUpdateSolrServer extends CommonsHttpSolrServer
}
catch( Exception ex ){}
}
} while( ! queue.isEmpty());
}
}
catch (Throwable e) {
handleError( e );

View File

@ -0,0 +1,42 @@
package org.apache.solr.client.solrj.embedded;
import org.apache.solr.client.solrj.SolrExampleTests;
import org.apache.solr.client.solrj.SolrServer;
import org.apache.solr.client.solrj.impl.BinaryRequestWriter;
import org.apache.solr.client.solrj.impl.BinaryResponseParser;
import org.apache.solr.client.solrj.impl.CommonsHttpSolrServer;
import org.apache.solr.client.solrj.impl.StreamingUpdateSolrServer;
import org.apache.solr.util.ExternalPaths;
import org.junit.BeforeClass;
public class SolrExampleStreamingBinaryTest extends SolrExampleTests {
@BeforeClass
public static void beforeTest() throws Exception {
createJetty(ExternalPaths.EXAMPLE_HOME, null, null);
}
@Override
public SolrServer createNewSolrServer()
{
try {
// setup the server...
String url = "http://localhost:"+port+context;
// smaller queue size hits locks more often
CommonsHttpSolrServer s = new StreamingUpdateSolrServer( url, 2, 5 ) {
@Override
public void handleError(Throwable ex) {
ex.printStackTrace();
}
};
s.setConnectionTimeout(100); // 1/10th sec
s.setDefaultMaxConnectionsPerHost(100);
s.setMaxTotalConnections(100);
s.setParser(new BinaryResponseParser());
s.setRequestWriter(new BinaryRequestWriter());
return s;
}
catch( Exception ex ) {
throw new RuntimeException( ex );
}
}
}