SOLR-508: CSV handler to use update request processors

git-svn-id: https://svn.apache.org/repos/asf/lucene/solr/trunk@639685 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Yonik Seeley 2008-03-21 15:56:13 +00:00
parent 608c90bc5c
commit 2799a781cc
1 changed files with 58 additions and 43 deletions

View File

@ -20,16 +20,21 @@ package org.apache.solr.handler;
import org.apache.solr.request.SolrQueryRequest; import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.request.SolrQueryResponse; import org.apache.solr.request.SolrQueryResponse;
import org.apache.solr.common.SolrException; import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.params.SolrParams; import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.params.UpdateParams;
import org.apache.solr.common.util.ContentStream; import org.apache.solr.common.util.ContentStream;
import org.apache.solr.common.util.StrUtils; import org.apache.solr.common.util.StrUtils;
import org.apache.solr.schema.IndexSchema; import org.apache.solr.schema.IndexSchema;
import org.apache.solr.schema.SchemaField; import org.apache.solr.schema.SchemaField;
import org.apache.solr.update.*; import org.apache.solr.update.*;
import org.apache.solr.update.processor.UpdateRequestProcessorFactory;
import org.apache.solr.update.processor.UpdateRequestProcessor;
import org.apache.commons.csv.CSVStrategy; import org.apache.commons.csv.CSVStrategy;
import org.apache.commons.csv.CSVParser; import org.apache.commons.csv.CSVParser;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import javax.xml.stream.XMLStreamReader;
import java.util.regex.Pattern; import java.util.regex.Pattern;
import java.util.List; import java.util.List;
import java.io.*; import java.io.*;
@ -41,28 +46,41 @@ import java.io.*;
public class CSVRequestHandler extends RequestHandlerBase { public class CSVRequestHandler extends RequestHandlerBase {
public void handleRequestBody(SolrQueryRequest req, SolrQueryResponse rsp) throws Exception { public void handleRequestBody(SolrQueryRequest req, SolrQueryResponse rsp) throws Exception {
CSVLoader loader = new SingleThreadedCSVLoader(req); SolrParams params = req.getParams();
UpdateRequestProcessorFactory processorFactory =
req.getCore().getUpdateProcessorFactory( params.get( UpdateParams.UPDATE_PROCESSOR ) );
Iterable<ContentStream> streams = req.getContentStreams(); UpdateRequestProcessor processor = processorFactory.getInstance(req, rsp, null);
if (streams == null) {
if(!RequestHandlerUtils.handleCommit(req, rsp, false)) { try {
throw new SolrException( SolrException.ErrorCode.BAD_REQUEST, "missing content stream" ); CSVLoader loader = new SingleThreadedCSVLoader(req, processor);
Iterable<ContentStream> streams = req.getContentStreams();
if( streams == null ) {
if( !RequestHandlerUtils.handleCommit(processor, params, false) ) {
throw new SolrException( SolrException.ErrorCode.BAD_REQUEST, "missing content stream" );
}
} }
return; else {
}
for(ContentStream stream : streams) { for(ContentStream stream : streams) {
Reader reader = stream.getReader(); Reader reader = stream.getReader();
try { try {
loader.errHeader = "CSVLoader: input=" + stream.getSourceInfo(); loader.errHeader = "CSVLoader: input=" + stream.getSourceInfo();
loader.load(reader); loader.load(reader);
} finally { } finally {
IOUtils.closeQuietly(reader); IOUtils.closeQuietly(reader);
}
}
// Perhaps commit from the parameters
RequestHandlerUtils.handleCommit( processor, params, false );
} }
} finally {
// finish the request
processor.finish();
} }
// perhaps commit when we are done
RequestHandlerUtils.handleCommit(req, rsp, false);
} }
//////////////////////// SolrInfoMBeans methods ////////////////////// //////////////////////// SolrInfoMBeans methods //////////////////////
@ -107,8 +125,9 @@ abstract class CSVLoader {
final IndexSchema schema; final IndexSchema schema;
final SolrParams params; final SolrParams params;
final UpdateHandler handler;
final CSVStrategy strategy; final CSVStrategy strategy;
final UpdateRequestProcessor processor;
String[] fieldnames; String[] fieldnames;
SchemaField[] fields; SchemaField[] fields;
@ -126,17 +145,17 @@ abstract class CSVLoader {
* MT-safe! * MT-safe!
*/ */
private class FieldAdder { private class FieldAdder {
void add(DocumentBuilder builder, int line, int column, String val) { void add(SolrInputDocument doc, int line, int column, String val) {
if (val.length() > 0) { if (val.length() > 0) {
builder.addField(fields[column].getName(),val,1.0f); doc.addField(fields[column].getName(),val,1.0f);
} }
} }
} }
/** add zero length fields */ /** add zero length fields */
private class FieldAdderEmpty extends CSVLoader.FieldAdder { private class FieldAdderEmpty extends CSVLoader.FieldAdder {
void add(DocumentBuilder builder, int line, int column, String val) { void add(SolrInputDocument doc, int line, int column, String val) {
builder.addField(fields[column].getName(),val,1.0f); doc.addField(fields[column].getName(),val,1.0f);
} }
} }
@ -144,8 +163,8 @@ abstract class CSVLoader {
private class FieldTrimmer extends CSVLoader.FieldAdder { private class FieldTrimmer extends CSVLoader.FieldAdder {
private final CSVLoader.FieldAdder base; private final CSVLoader.FieldAdder base;
FieldTrimmer(CSVLoader.FieldAdder base) { this.base=base; } FieldTrimmer(CSVLoader.FieldAdder base) { this.base=base; }
void add(DocumentBuilder builder, int line, int column, String val) { void add(SolrInputDocument doc, int line, int column, String val) {
base.add(builder, line, column, val.trim()); base.add(doc, line, column, val.trim());
} }
} }
@ -162,9 +181,9 @@ abstract class CSVLoader {
this.to=to; this.to=to;
this.base=base; this.base=base;
} }
void add(DocumentBuilder builder, int line, int column, String val) { void add(SolrInputDocument doc, int line, int column, String val) {
if (from.equals(val)) val=to; if (from.equals(val)) val=to;
base.add(builder,line,column,val); base.add(doc,line,column,val);
} }
} }
@ -179,14 +198,14 @@ abstract class CSVLoader {
this.base = base; this.base = base;
} }
void add(DocumentBuilder builder, int line, int column, String val) { void add(SolrInputDocument doc, int line, int column, String val) {
CSVParser parser = new CSVParser(new StringReader(val), strategy); CSVParser parser = new CSVParser(new StringReader(val), strategy);
try { try {
String[] vals = parser.getLine(); String[] vals = parser.getLine();
if (vals!=null) { if (vals!=null) {
for (String v: vals) base.add(builder,line,column,v); for (String v: vals) base.add(doc,line,column,v);
} else { } else {
base.add(builder,line,column,val); base.add(doc,line,column,val);
} }
} catch (IOException e) { } catch (IOException e) {
throw new SolrException( SolrException.ErrorCode.BAD_REQUEST,e); throw new SolrException( SolrException.ErrorCode.BAD_REQUEST,e);
@ -197,9 +216,9 @@ abstract class CSVLoader {
String errHeader="CSVLoader:"; String errHeader="CSVLoader:";
CSVLoader(SolrQueryRequest req) { CSVLoader(SolrQueryRequest req, UpdateRequestProcessor processor) {
this.processor = processor;
this.params = req.getParams(); this.params = req.getParams();
handler = req.getCore().getUpdateHandler();
schema = req.getSchema(); schema = req.getSchema();
templateAdd = new AddUpdateCommand(); templateAdd = new AddUpdateCommand();
@ -382,35 +401,31 @@ abstract class CSVLoader {
abstract void addDoc(int line, String[] vals) throws IOException; abstract void addDoc(int line, String[] vals) throws IOException;
/** this must be MT safe... may be called concurrently from multiple threads. */ /** this must be MT safe... may be called concurrently from multiple threads. */
void doAdd(int line, String[] vals, DocumentBuilder builder, AddUpdateCommand template) throws IOException { void doAdd(int line, String[] vals, SolrInputDocument doc, AddUpdateCommand template) throws IOException {
// the line number is passed simply for error reporting in MT mode. // the line number is passed simply for error reporting in MT mode.
// first, create the lucene document // first, create the lucene document
builder.startDoc();
for (int i=0; i<vals.length; i++) { for (int i=0; i<vals.length; i++) {
if (fields[i]==null) continue; // ignore this field if (fields[i]==null) continue; // ignore this field
String val = vals[i]; String val = vals[i];
adders[i].add(builder, line, i, val); adders[i].add(doc, line, i, val);
} }
builder.endDoc();
template.doc = builder.getDoc(); template.solrDoc = doc;
handler.addDoc(template); processor.processAdd(template);
} }
} }
class SingleThreadedCSVLoader extends CSVLoader { class SingleThreadedCSVLoader extends CSVLoader {
protected DocumentBuilder builder; SingleThreadedCSVLoader(SolrQueryRequest req, UpdateRequestProcessor processor) {
super(req, processor);
SingleThreadedCSVLoader(SolrQueryRequest req) {
super(req);
builder = new DocumentBuilder(schema);
} }
void addDoc(int line, String[] vals) throws IOException { void addDoc(int line, String[] vals) throws IOException {
templateAdd.indexedId = null; templateAdd.indexedId = null;
doAdd(line, vals, builder, templateAdd); SolrInputDocument doc = new SolrInputDocument();
doAdd(line, vals, doc, templateAdd);
} }
} }