SOLR-852: Refactor common code in some update request handlers that deal with ContentStreams

git-svn-id: https://svn.apache.org/repos/asf/lucene/solr/trunk@713761 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Grant Ingersoll 2008-11-13 17:51:08 +00:00
parent 57c70675d3
commit 4d7731fc90
7 changed files with 513 additions and 402 deletions

View File

@ -118,6 +118,8 @@ Other Changes
3. DumpRequestHandler (/debug/dump): changed 'fieldName' to 'sourceInfo'. (ehatcher) 3. DumpRequestHandler (/debug/dump): changed 'fieldName' to 'sourceInfo'. (ehatcher)
4. SOLR-852: Refactored common code in CSVRequestHandler and XMLUpdateRequestHandler (gsingers, ehatcher)
Build Build
---------------------- ----------------------

View File

@ -22,20 +22,16 @@ import org.apache.solr.request.SolrQueryResponse;
import org.apache.solr.common.SolrException; import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrInputDocument; import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.params.SolrParams; import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.params.UpdateParams;
import org.apache.solr.common.util.ContentStream;
import org.apache.solr.common.util.StrUtils; import org.apache.solr.common.util.StrUtils;
import org.apache.solr.common.util.ContentStream;
import org.apache.solr.schema.IndexSchema; import org.apache.solr.schema.IndexSchema;
import org.apache.solr.schema.SchemaField; import org.apache.solr.schema.SchemaField;
import org.apache.solr.update.*; import org.apache.solr.update.*;
import org.apache.solr.update.processor.UpdateRequestProcessorChain;
import org.apache.solr.update.processor.UpdateRequestProcessorFactory;
import org.apache.solr.update.processor.UpdateRequestProcessor; import org.apache.solr.update.processor.UpdateRequestProcessor;
import org.apache.commons.csv.CSVStrategy; import org.apache.commons.csv.CSVStrategy;
import org.apache.commons.csv.CSVParser; import org.apache.commons.csv.CSVParser;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import javax.xml.stream.XMLStreamReader;
import java.util.regex.Pattern; import java.util.regex.Pattern;
import java.util.List; import java.util.List;
import java.io.*; import java.io.*;
@ -44,45 +40,10 @@ import java.io.*;
* @version $Id$ * @version $Id$
*/ */
public class CSVRequestHandler extends RequestHandlerBase { public class CSVRequestHandler extends ContentStreamHandlerBase {
public void handleRequestBody(SolrQueryRequest req, SolrQueryResponse rsp) throws Exception { protected ContentStreamLoader newLoader(SolrQueryRequest req, UpdateRequestProcessor processor) {
SolrParams params = req.getParams(); return new SingleThreadedCSVLoader(req, processor);
UpdateRequestProcessorChain processorChain =
req.getCore().getUpdateProcessingChain( params.get( UpdateParams.UPDATE_PROCESSOR ) );
UpdateRequestProcessor processor = processorChain.createProcessor(req, rsp);
try {
CSVLoader loader = new SingleThreadedCSVLoader(req, processor);
Iterable<ContentStream> streams = req.getContentStreams();
if( streams == null ) {
if (!RequestHandlerUtils.handleCommit(processor, params, false) && !RequestHandlerUtils.handleRollback(processor, params, false)) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "missing content stream");
}
}
else {
for(ContentStream stream : streams) {
Reader reader = stream.getReader();
try {
loader.errHeader = "CSVLoader: input=" + stream.getSourceInfo();
loader.load(reader);
} finally {
IOUtils.closeQuietly(reader);
}
}
// Perhaps commit from the parameters
RequestHandlerUtils.handleCommit( processor, params, false );
RequestHandlerUtils.handleRollback(processor, params, false );
}
} finally {
// finish the request
processor.finish();
}
} }
//////////////////////// SolrInfoMBeans methods ////////////////////// //////////////////////// SolrInfoMBeans methods //////////////////////
@ -108,7 +69,7 @@ public class CSVRequestHandler extends RequestHandlerBase {
} }
abstract class CSVLoader { abstract class CSVLoader extends ContentStreamLoader {
static String SEPARATOR="separator"; static String SEPARATOR="separator";
static String FIELDNAMES="fieldnames"; static String FIELDNAMES="fieldnames";
static String HEADER="header"; static String HEADER="header";
@ -140,6 +101,7 @@ abstract class CSVLoader {
final AddUpdateCommand templateAdd; final AddUpdateCommand templateAdd;
/** Add a field to a document unless it's zero length. /** Add a field to a document unless it's zero length.
* The FieldAdder hierarchy handles all the complexity of * The FieldAdder hierarchy handles all the complexity of
* further transforming or splitting field values to keep the * further transforming or splitting field values to keep the
@ -362,40 +324,48 @@ abstract class CSVLoader {
} }
/** load the CSV input */ /** load the CSV input */
void load(Reader input) throws IOException { public void load(SolrQueryRequest req, SolrQueryResponse rsp, ContentStream stream) throws IOException {
Reader reader = input; errHeader = "CSVLoader: input=" + stream.getSourceInfo();
if (skipLines>0) { Reader reader = null;
if (!(reader instanceof BufferedReader)) { try {
reader = new BufferedReader(reader); reader = stream.getReader();
if (skipLines>0) {
if (!(reader instanceof BufferedReader)) {
reader = new BufferedReader(reader);
}
BufferedReader r = (BufferedReader)reader;
for (int i=0; i<skipLines; i++) {
r.readLine();
}
} }
BufferedReader r = (BufferedReader)reader;
for (int i=0; i<skipLines; i++) {
r.readLine();
}
}
CSVParser parser = new CSVParser(reader, strategy); CSVParser parser = new CSVParser(reader, strategy);
// parse the fieldnames from the header of the file // parse the fieldnames from the header of the file
if (fieldnames==null) {
fieldnames = parser.getLine();
if (fieldnames==null) { if (fieldnames==null) {
throw new SolrException( SolrException.ErrorCode.BAD_REQUEST,"Expected fieldnames in CSV input"); fieldnames = parser.getLine();
} if (fieldnames==null) {
prepareFields(); throw new SolrException( SolrException.ErrorCode.BAD_REQUEST,"Expected fieldnames in CSV input");
} }
prepareFields();
// read the rest of the CSV file
for(;;) {
int line = parser.getLineNumber(); // for error reporting in MT mode
String[] vals = parser.getLine();
if (vals==null) break;
if (vals.length != fields.length) {
input_err("expected "+fields.length+" values but got "+vals.length, vals, line);
} }
addDoc(line,vals); // read the rest of the CSV file
for(;;) {
int line = parser.getLineNumber(); // for error reporting in MT mode
String[] vals = parser.getLine();
if (vals==null) break;
if (vals.length != fields.length) {
input_err("expected "+fields.length+" values but got "+vals.length, vals, line);
}
addDoc(line,vals);
}
} finally{
if (reader != null) {
IOUtils.closeQuietly(reader);
}
} }
} }

View File

@ -0,0 +1,68 @@
package org.apache.solr.handler;
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import org.apache.solr.common.SolrException;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.params.UpdateParams;
import org.apache.solr.common.util.ContentStream;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.request.SolrQueryResponse;
import org.apache.solr.update.processor.UpdateRequestProcessor;
import org.apache.solr.update.processor.UpdateRequestProcessorChain;
/**
* Shares common code between various handlers that manipulate {@link org.apache.solr.common.util.ContentStream} objects.
*
**/
public abstract class ContentStreamHandlerBase extends RequestHandlerBase {
public void handleRequestBody(SolrQueryRequest req, SolrQueryResponse rsp) throws Exception {
SolrParams params = req.getParams();
UpdateRequestProcessorChain processorChain =
req.getCore().getUpdateProcessingChain(params.get(UpdateParams.UPDATE_PROCESSOR));
UpdateRequestProcessor processor = processorChain.createProcessor(req, rsp);
try {
ContentStreamLoader documentLoader = newLoader(req, processor);
Iterable<ContentStream> streams = req.getContentStreams();
if (streams == null) {
if (!RequestHandlerUtils.handleCommit(processor, params, false) && !RequestHandlerUtils.handleRollback(processor, params, false)) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "missing content stream");
}
} else {
for (ContentStream stream : streams) {
documentLoader.load(req, rsp, stream);
}
// Perhaps commit from the parameters
RequestHandlerUtils.handleCommit(processor, params, false);
RequestHandlerUtils.handleRollback(processor, params, false);
}
} finally {
// finish the request
processor.finish();
}
}
protected abstract ContentStreamLoader newLoader(SolrQueryRequest req, UpdateRequestProcessor processor);
}

View File

@ -0,0 +1,51 @@
package org.apache.solr.handler;
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import org.apache.solr.common.util.ContentStream;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.request.SolrQueryResponse;
/**
* Load a {@link org.apache.solr.common.util.ContentStream} into Solr
*
**/
public abstract class ContentStreamLoader {
protected String errHeader;
public String getErrHeader() {
return errHeader;
}
public void setErrHeader(String errHeader) {
this.errHeader = errHeader;
}
/**
* Loaders are responsible for closing the stream
*
* @param req The input {@link org.apache.solr.request.SolrQueryRequest}
* @param rsp The response, in case the Loader wishes to add anything
* @param stream The {@link org.apache.solr.common.util.ContentStream} to add
*/
public abstract void load(SolrQueryRequest req, SolrQueryResponse rsp, ContentStream stream) throws Exception;
}

View File

@ -0,0 +1,318 @@
package org.apache.solr.handler;
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import org.apache.solr.update.processor.UpdateRequestProcessor;
import org.apache.solr.update.AddUpdateCommand;
import org.apache.solr.update.CommitUpdateCommand;
import org.apache.solr.update.RollbackUpdateCommand;
import org.apache.solr.update.DeleteUpdateCommand;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.request.SolrQueryResponse;
import org.apache.solr.common.util.ContentStream;
import org.apache.solr.common.util.StrUtils;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.params.UpdateParams;
import org.apache.commons.io.IOUtils;
import javax.xml.stream.XMLStreamReader;
import javax.xml.stream.XMLStreamException;
import javax.xml.stream.FactoryConfigurationError;
import javax.xml.stream.XMLStreamConstants;
import javax.xml.stream.XMLInputFactory;
import javax.xml.transform.TransformerConfigurationException;
import java.io.Reader;
import java.io.StringReader;
import java.io.IOException;
/**
*
*
**/
class XMLLoader extends ContentStreamLoader {
protected UpdateRequestProcessor processor;
private XMLInputFactory inputFactory;
public XMLLoader(UpdateRequestProcessor processor, XMLInputFactory inputFactory) {
this.processor = processor;
this.inputFactory = inputFactory;
}
public void load(SolrQueryRequest req, SolrQueryResponse rsp, ContentStream stream) throws Exception {
errHeader = "XMLLoader: " + stream.getSourceInfo();
Reader reader = null;
try {
reader = stream.getReader();
if (XmlUpdateRequestHandler.log.isTraceEnabled()) {
String body = IOUtils.toString(reader);
XmlUpdateRequestHandler.log.trace("body", body);
reader = new StringReader(body);
}
XMLStreamReader parser = inputFactory.createXMLStreamReader(reader);
this.processUpdate(processor, parser);
}
catch (XMLStreamException e) {
//Hmmm, not quite right
throw new IOException(e.getMessage());
} finally {
IOUtils.closeQuietly(reader);
}
}
/**
* @since solr 1.2
*/
void processUpdate(UpdateRequestProcessor processor, XMLStreamReader parser)
throws XMLStreamException, IOException, FactoryConfigurationError,
InstantiationException, IllegalAccessException,
TransformerConfigurationException {
AddUpdateCommand addCmd = null;
while (true) {
int event = parser.next();
switch (event) {
case XMLStreamConstants.END_DOCUMENT:
parser.close();
return;
case XMLStreamConstants.START_ELEMENT:
String currTag = parser.getLocalName();
if (currTag.equals(XmlUpdateRequestHandler.ADD)) {
XmlUpdateRequestHandler.log.trace("SolrCore.update(add)");
addCmd = new AddUpdateCommand();
boolean overwrite = true; // the default
Boolean overwritePending = null;
Boolean overwriteCommitted = null;
for (int i = 0; i < parser.getAttributeCount(); i++) {
String attrName = parser.getAttributeLocalName(i);
String attrVal = parser.getAttributeValue(i);
if (XmlUpdateRequestHandler.OVERWRITE.equals(attrName)) {
overwrite = StrUtils.parseBoolean(attrVal);
} else if (XmlUpdateRequestHandler.ALLOW_DUPS.equals(attrName)) {
overwrite = !StrUtils.parseBoolean(attrVal);
} else if (XmlUpdateRequestHandler.COMMIT_WITHIN.equals(attrName)) {
addCmd.commitWithin = Integer.parseInt(attrVal);
} else if (XmlUpdateRequestHandler.OVERWRITE_PENDING.equals(attrName)) {
overwritePending = StrUtils.parseBoolean(attrVal);
} else if (XmlUpdateRequestHandler.OVERWRITE_COMMITTED.equals(attrName)) {
overwriteCommitted = StrUtils.parseBoolean(attrVal);
} else {
XmlUpdateRequestHandler.log.warn("Unknown attribute id in add:" + attrName);
}
}
// check if these flags are set
if (overwritePending != null && overwriteCommitted != null) {
if (overwritePending != overwriteCommitted) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
"can't have different values for 'overwritePending' and 'overwriteCommitted'");
}
overwrite = overwritePending;
}
addCmd.overwriteCommitted = overwrite;
addCmd.overwritePending = overwrite;
addCmd.allowDups = !overwrite;
} else if ("doc".equals(currTag)) {
XmlUpdateRequestHandler.log.trace("adding doc...");
addCmd.clear();
addCmd.solrDoc = readDoc(parser);
processor.processAdd(addCmd);
} else if (XmlUpdateRequestHandler.COMMIT.equals(currTag) || XmlUpdateRequestHandler.OPTIMIZE.equals(currTag)) {
XmlUpdateRequestHandler.log.trace("parsing " + currTag);
CommitUpdateCommand cmd = new CommitUpdateCommand(XmlUpdateRequestHandler.OPTIMIZE.equals(currTag));
boolean sawWaitSearcher = false, sawWaitFlush = false;
for (int i = 0; i < parser.getAttributeCount(); i++) {
String attrName = parser.getAttributeLocalName(i);
String attrVal = parser.getAttributeValue(i);
if (XmlUpdateRequestHandler.WAIT_FLUSH.equals(attrName)) {
cmd.waitFlush = StrUtils.parseBoolean(attrVal);
sawWaitFlush = true;
} else if (XmlUpdateRequestHandler.WAIT_SEARCHER.equals(attrName)) {
cmd.waitSearcher = StrUtils.parseBoolean(attrVal);
sawWaitSearcher = true;
} else if (UpdateParams.MAX_OPTIMIZE_SEGMENTS.equals(attrName)) {
cmd.maxOptimizeSegments = Integer.parseInt(attrVal);
} else {
XmlUpdateRequestHandler.log.warn("unexpected attribute commit/@" + attrName);
}
}
// If waitFlush is specified and waitSearcher wasn't, then
// clear waitSearcher.
if (sawWaitFlush && !sawWaitSearcher) {
cmd.waitSearcher = false;
}
processor.processCommit(cmd);
} // end commit
else if (XmlUpdateRequestHandler.ROLLBACK.equals(currTag)) {
XmlUpdateRequestHandler.log.trace("parsing " + currTag);
RollbackUpdateCommand cmd = new RollbackUpdateCommand();
processor.processRollback(cmd);
} // end rollback
else if (XmlUpdateRequestHandler.DELETE.equals(currTag)) {
XmlUpdateRequestHandler.log.trace("parsing delete");
processDelete(processor, parser);
} // end delete
break;
}
}
}
/**
* @since solr 1.3
*/
void processDelete(UpdateRequestProcessor processor, XMLStreamReader parser) throws XMLStreamException, IOException {
// Parse the command
DeleteUpdateCommand deleteCmd = new DeleteUpdateCommand();
deleteCmd.fromPending = true;
deleteCmd.fromCommitted = true;
for (int i = 0; i < parser.getAttributeCount(); i++) {
String attrName = parser.getAttributeLocalName(i);
String attrVal = parser.getAttributeValue(i);
if ("fromPending".equals(attrName)) {
deleteCmd.fromPending = StrUtils.parseBoolean(attrVal);
} else if ("fromCommitted".equals(attrName)) {
deleteCmd.fromCommitted = StrUtils.parseBoolean(attrVal);
} else {
XmlUpdateRequestHandler.log.warn("unexpected attribute delete/@" + attrName);
}
}
StringBuilder text = new StringBuilder();
while (true) {
int event = parser.next();
switch (event) {
case XMLStreamConstants.START_ELEMENT:
String mode = parser.getLocalName();
if (!("id".equals(mode) || "query".equals(mode))) {
XmlUpdateRequestHandler.log.warn("unexpected XML tag /delete/" + mode);
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
"unexpected XML tag /delete/" + mode);
}
text.setLength(0);
break;
case XMLStreamConstants.END_ELEMENT:
String currTag = parser.getLocalName();
if ("id".equals(currTag)) {
deleteCmd.id = text.toString();
} else if ("query".equals(currTag)) {
deleteCmd.query = text.toString();
} else if ("delete".equals(currTag)) {
return;
} else {
XmlUpdateRequestHandler.log.warn("unexpected XML tag /delete/" + currTag);
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
"unexpected XML tag /delete/" + currTag);
}
processor.processDelete(deleteCmd);
break;
// Add everything to the text
case XMLStreamConstants.SPACE:
case XMLStreamConstants.CDATA:
case XMLStreamConstants.CHARACTERS:
text.append(parser.getText());
break;
}
}
}
/**
* Given the input stream, read a document
*
* @since solr 1.3
*/
SolrInputDocument readDoc(XMLStreamReader parser) throws XMLStreamException {
SolrInputDocument doc = new SolrInputDocument();
String attrName = "";
for (int i = 0; i < parser.getAttributeCount(); i++) {
attrName = parser.getAttributeLocalName(i);
if ("boost".equals(attrName)) {
doc.setDocumentBoost(Float.parseFloat(parser.getAttributeValue(i)));
} else {
XmlUpdateRequestHandler.log.warn("Unknown attribute doc/@" + attrName);
}
}
StringBuilder text = new StringBuilder();
String name = null;
float boost = 1.0f;
boolean isNull = false;
while (true) {
int event = parser.next();
switch (event) {
// Add everything to the text
case XMLStreamConstants.SPACE:
case XMLStreamConstants.CDATA:
case XMLStreamConstants.CHARACTERS:
text.append(parser.getText());
break;
case XMLStreamConstants.END_ELEMENT:
if ("doc".equals(parser.getLocalName())) {
return doc;
} else if ("field".equals(parser.getLocalName())) {
if (!isNull) {
doc.addField(name, text.toString(), boost);
boost = 1.0f;
}
}
break;
case XMLStreamConstants.START_ELEMENT:
text.setLength(0);
String localName = parser.getLocalName();
if (!"field".equals(localName)) {
XmlUpdateRequestHandler.log.warn("unexpected XML tag doc/" + localName);
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
"unexpected XML tag doc/" + localName);
}
boost = 1.0f;
String attrVal = "";
for (int i = 0; i < parser.getAttributeCount(); i++) {
attrName = parser.getAttributeLocalName(i);
attrVal = parser.getAttributeValue(i);
if ("name".equals(attrName)) {
name = attrVal;
} else if ("boost".equals(attrName)) {
boost = Float.parseFloat(attrVal);
} else if ("null".equals(attrName)) {
isNull = StrUtils.parseBoolean(attrVal);
} else {
XmlUpdateRequestHandler.log.warn("Unknown attribute doc/field/@" + attrName);
}
}
break;
}
}
}
}

View File

@ -17,50 +17,31 @@
package org.apache.solr.handler; package org.apache.solr.handler;
import java.io.IOException;
import java.io.Reader;
import java.io.StringReader;
import java.io.Writer;
import java.io.File;
import java.util.HashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javanet.staxutils.BaseXMLInputFactory; import javanet.staxutils.BaseXMLInputFactory;
import javax.xml.stream.FactoryConfigurationError;
import javax.xml.stream.XMLInputFactory;
import javax.xml.stream.XMLStreamConstants;
import javax.xml.stream.XMLStreamException;
import javax.xml.stream.XMLStreamReader;
import javax.xml.transform.TransformerConfigurationException;
import org.apache.commons.io.IOUtils;
import org.apache.solr.common.SolrException; import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.params.MapSolrParams; import org.apache.solr.common.params.MapSolrParams;
import org.apache.solr.common.params.SolrParams; import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.params.UpdateParams;
import org.apache.solr.common.util.ContentStream;
import org.apache.solr.common.util.NamedList; import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.StrUtils;
import org.apache.solr.common.util.XML; import org.apache.solr.common.util.XML;
import org.apache.solr.core.SolrCore; import org.apache.solr.core.SolrCore;
import org.apache.solr.request.SolrQueryRequest; import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.request.SolrQueryRequestBase; import org.apache.solr.request.SolrQueryRequestBase;
import org.apache.solr.request.SolrQueryResponse; import org.apache.solr.request.SolrQueryResponse;
import org.apache.solr.update.AddUpdateCommand;
import org.apache.solr.update.CommitUpdateCommand;
import org.apache.solr.update.DeleteUpdateCommand;
import org.apache.solr.update.RollbackUpdateCommand;
import org.apache.solr.update.processor.UpdateRequestProcessorChain;
import org.apache.solr.update.processor.UpdateRequestProcessor; import org.apache.solr.update.processor.UpdateRequestProcessor;
import org.apache.solr.update.processor.UpdateRequestProcessorChain;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.xml.stream.XMLInputFactory;
import javax.xml.stream.XMLStreamReader;
import java.io.Reader;
import java.io.Writer;
import java.util.HashMap;
/** /**
* Add documents to solr using the STAX XML parser. * Add documents to solr using the STAX XML parser.
*/ */
public class XmlUpdateRequestHandler extends RequestHandlerBase public class XmlUpdateRequestHandler extends ContentStreamHandlerBase {
{
public static Logger log = LoggerFactory.getLogger(XmlUpdateRequestHandler.class); public static Logger log = LoggerFactory.getLogger(XmlUpdateRequestHandler.class);
public static final String UPDATE_PROCESSOR = "update.processor"; public static final String UPDATE_PROCESSOR = "update.processor";
@ -73,21 +54,20 @@ public class XmlUpdateRequestHandler extends RequestHandlerBase
public static final String ROLLBACK = "rollback"; public static final String ROLLBACK = "rollback";
public static final String WAIT_SEARCHER = "waitSearcher"; public static final String WAIT_SEARCHER = "waitSearcher";
public static final String WAIT_FLUSH = "waitFlush"; public static final String WAIT_FLUSH = "waitFlush";
public static final String OVERWRITE = "overwrite"; public static final String OVERWRITE = "overwrite";
public static final String COMMIT_WITHIN = "commitWithin"; public static final String COMMIT_WITHIN = "commitWithin";
public static final String OVERWRITE_COMMITTED = "overwriteCommitted"; // @Deprecated public static final String OVERWRITE_COMMITTED = "overwriteCommitted"; // @Deprecated
public static final String OVERWRITE_PENDING = "overwritePending"; // @Deprecated public static final String OVERWRITE_PENDING = "overwritePending"; // @Deprecated
public static final String ALLOW_DUPS = "allowDups"; public static final String ALLOW_DUPS = "allowDups";
XMLInputFactory inputFactory; XMLInputFactory inputFactory;
@Override @Override
public void init(NamedList args) public void init(NamedList args) {
{
super.init(args); super.init(args);
inputFactory = BaseXMLInputFactory.newInstance(); inputFactory = BaseXMLInputFactory.newInstance();
try { try {
// The java 1.6 bundled stax parser (sjsxp) does not currently have a thread-safe // The java 1.6 bundled stax parser (sjsxp) does not currently have a thread-safe
@ -98,305 +78,25 @@ public class XmlUpdateRequestHandler extends RequestHandlerBase
// have thread-safe factories. // have thread-safe factories.
inputFactory.setProperty("reuse-instance", Boolean.FALSE); inputFactory.setProperty("reuse-instance", Boolean.FALSE);
} }
catch( IllegalArgumentException ex ) { catch (IllegalArgumentException ex) {
// Other implementations will likely throw this exception since "reuse-instance" // Other implementations will likely throw this exception since "reuse-instance"
// isimplementation specific. // isimplementation specific.
log.debug( "Unable to set the 'reuse-instance' property for the input chain: "+inputFactory ); log.debug("Unable to set the 'reuse-instance' property for the input chain: " + inputFactory);
}
}
@Override
public void handleRequestBody(SolrQueryRequest req, SolrQueryResponse rsp) throws Exception
{
SolrParams params = req.getParams();
UpdateRequestProcessorChain processingChain =
req.getCore().getUpdateProcessingChain( params.get( UpdateParams.UPDATE_PROCESSOR ) );
UpdateRequestProcessor processor = processingChain.createProcessor(req, rsp);
Iterable<ContentStream> streams = req.getContentStreams();
if( streams == null ) {
if (!RequestHandlerUtils.handleCommit(processor, params, false) && !RequestHandlerUtils.handleRollback(processor, params, false)) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "missing content stream");
}
}
else {
// Cycle through each stream
for( ContentStream stream : req.getContentStreams() ) {
Reader reader = stream.getReader();
try {
if( log.isTraceEnabled() ) {
String body = IOUtils.toString( reader );
log.trace( "body", body );
reader = new StringReader( body );
}
XMLStreamReader parser = inputFactory.createXMLStreamReader(reader);
this.processUpdate( processor, parser );
}
finally {
IOUtils.closeQuietly(reader);
}
}
// Perhaps commit from the parameters
RequestHandlerUtils.handleCommit( processor, params, false );
// Perhaps rollback from the parameters
RequestHandlerUtils.handleRollback( processor, params, false );
}
// finish the request
processor.finish();
}
/**
* @since solr 1.2
*/
void processUpdate( UpdateRequestProcessor processor, XMLStreamReader parser)
throws XMLStreamException, IOException, FactoryConfigurationError,
InstantiationException, IllegalAccessException,
TransformerConfigurationException
{
AddUpdateCommand addCmd = null;
while (true) {
int event = parser.next();
switch (event) {
case XMLStreamConstants.END_DOCUMENT:
parser.close();
return;
case XMLStreamConstants.START_ELEMENT:
String currTag = parser.getLocalName();
if (currTag.equals(ADD)) {
log.trace("SolrCore.update(add)");
addCmd = new AddUpdateCommand();
boolean overwrite=true; // the default
Boolean overwritePending = null;
Boolean overwriteCommitted = null;
for (int i=0; i<parser.getAttributeCount(); i++) {
String attrName = parser.getAttributeLocalName(i);
String attrVal = parser.getAttributeValue(i);
if (OVERWRITE.equals(attrName)) {
overwrite = StrUtils.parseBoolean(attrVal);
} else if (ALLOW_DUPS.equals(attrName)) {
overwrite = !StrUtils.parseBoolean(attrVal);
} else if ( COMMIT_WITHIN.equals(attrName) ) {
addCmd.commitWithin = Integer.parseInt( attrVal );
} else if ( OVERWRITE_PENDING.equals(attrName) ) {
overwritePending = StrUtils.parseBoolean(attrVal);
} else if ( OVERWRITE_COMMITTED.equals(attrName) ) {
overwriteCommitted = StrUtils.parseBoolean(attrVal);
} else {
log.warn("Unknown attribute id in add:" + attrName);
}
}
// check if these flags are set
if( overwritePending != null && overwriteCommitted != null ) {
if( overwritePending != overwriteCommitted ) {
throw new SolrException( SolrException.ErrorCode.BAD_REQUEST,
"can't have different values for 'overwritePending' and 'overwriteCommitted'" );
}
overwrite=overwritePending;
}
addCmd.overwriteCommitted = overwrite;
addCmd.overwritePending = overwrite;
addCmd.allowDups = !overwrite;
}
else if ("doc".equals(currTag)) {
log.trace("adding doc...");
addCmd.clear();
addCmd.solrDoc = readDoc( parser );
processor.processAdd(addCmd);
}
else if ( COMMIT.equals(currTag) || OPTIMIZE.equals(currTag)) {
log.trace("parsing " + currTag);
CommitUpdateCommand cmd = new CommitUpdateCommand(OPTIMIZE.equals(currTag));
boolean sawWaitSearcher = false, sawWaitFlush = false;
for (int i = 0; i < parser.getAttributeCount(); i++) {
String attrName = parser.getAttributeLocalName(i);
String attrVal = parser.getAttributeValue(i);
if (WAIT_FLUSH.equals(attrName)) {
cmd.waitFlush = StrUtils.parseBoolean(attrVal);
sawWaitFlush = true;
} else if (WAIT_SEARCHER.equals(attrName)) {
cmd.waitSearcher = StrUtils.parseBoolean(attrVal);
sawWaitSearcher = true;
} else if (UpdateParams.MAX_OPTIMIZE_SEGMENTS.equals(attrName)){
cmd.maxOptimizeSegments = Integer.parseInt(attrVal);
}
else {
log.warn("unexpected attribute commit/@" + attrName);
}
}
// If waitFlush is specified and waitSearcher wasn't, then
// clear waitSearcher.
if (sawWaitFlush && !sawWaitSearcher) {
cmd.waitSearcher = false;
}
processor.processCommit( cmd );
} // end commit
else if ( ROLLBACK.equals(currTag) ) {
log.trace("parsing " + currTag);
RollbackUpdateCommand cmd = new RollbackUpdateCommand();
processor.processRollback( cmd );
} // end rollback
else if (DELETE.equals(currTag)) {
log.trace("parsing delete");
processDelete( processor, parser);
} // end delete
break;
}
} }
} }
/** protected ContentStreamLoader newLoader(SolrQueryRequest req, UpdateRequestProcessor processor) {
* @since solr 1.3 return new XMLLoader(processor, inputFactory);
*/
void processDelete(UpdateRequestProcessor processor, XMLStreamReader parser) throws XMLStreamException, IOException
{
// Parse the command
DeleteUpdateCommand deleteCmd = new DeleteUpdateCommand();
deleteCmd.fromPending = true;
deleteCmd.fromCommitted = true;
for (int i = 0; i < parser.getAttributeCount(); i++) {
String attrName = parser.getAttributeLocalName(i);
String attrVal = parser.getAttributeValue(i);
if ("fromPending".equals(attrName)) {
deleteCmd.fromPending = StrUtils.parseBoolean(attrVal);
} else if ("fromCommitted".equals(attrName)) {
deleteCmd.fromCommitted = StrUtils.parseBoolean(attrVal);
} else {
log.warn("unexpected attribute delete/@" + attrName);
}
}
StringBuilder text = new StringBuilder();
while (true) {
int event = parser.next();
switch (event) {
case XMLStreamConstants.START_ELEMENT:
String mode = parser.getLocalName();
if (!("id".equals(mode) || "query".equals(mode))) {
log.warn("unexpected XML tag /delete/" + mode);
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
"unexpected XML tag /delete/" + mode);
}
text.setLength( 0 );
break;
case XMLStreamConstants.END_ELEMENT:
String currTag = parser.getLocalName();
if ("id".equals(currTag)) {
deleteCmd.id = text.toString();
} else if ("query".equals(currTag)) {
deleteCmd.query = text.toString();
} else if( "delete".equals( currTag ) ) {
return;
} else {
log.warn("unexpected XML tag /delete/" + currTag);
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
"unexpected XML tag /delete/" + currTag);
}
processor.processDelete( deleteCmd );
break;
// Add everything to the text
case XMLStreamConstants.SPACE:
case XMLStreamConstants.CDATA:
case XMLStreamConstants.CHARACTERS:
text.append( parser.getText() );
break;
}
}
} }
/**
* Given the input stream, read a document
*
* @since solr 1.3
*/
SolrInputDocument readDoc(XMLStreamReader parser) throws XMLStreamException {
SolrInputDocument doc = new SolrInputDocument();
String attrName = "";
for (int i = 0; i < parser.getAttributeCount(); i++) {
attrName = parser.getAttributeLocalName(i);
if ("boost".equals(attrName)) {
doc.setDocumentBoost( Float.parseFloat(parser.getAttributeValue(i)) );
} else {
log.warn("Unknown attribute doc/@" + attrName);
}
}
StringBuilder text = new StringBuilder();
String name = null;
float boost = 1.0f;
boolean isNull = false;
while (true) {
int event = parser.next();
switch (event) {
// Add everything to the text
case XMLStreamConstants.SPACE:
case XMLStreamConstants.CDATA:
case XMLStreamConstants.CHARACTERS:
text.append( parser.getText() );
break;
case XMLStreamConstants.END_ELEMENT:
if ("doc".equals(parser.getLocalName())) {
return doc;
}
else if ("field".equals(parser.getLocalName())) {
if (!isNull) {
doc.addField(name, text.toString(), boost );
boost = 1.0f;
}
}
break;
case XMLStreamConstants.START_ELEMENT:
text.setLength(0);
String localName = parser.getLocalName();
if (!"field".equals(localName)) {
log.warn("unexpected XML tag doc/" + localName);
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
"unexpected XML tag doc/" + localName);
}
boost = 1.0f;
String attrVal = "";
for (int i = 0; i < parser.getAttributeCount(); i++) {
attrName = parser.getAttributeLocalName(i);
attrVal = parser.getAttributeValue(i);
if ("name".equals(attrName)) {
name = attrVal;
} else if ("boost".equals(attrName)) {
boost = Float.parseFloat(attrVal);
} else if ("null".equals(attrName)) {
isNull = StrUtils.parseBoolean(attrVal);
} else {
log.warn("Unknown attribute doc/field/@" + attrName);
}
}
break;
}
}
}
/** /**
* A Convenience method for getting back a simple XML string indicating * A Convenience method for getting back a simple XML string indicating
* success or failure from an XML formated Update (from the Reader) * success or failure from an XML formated Update (from the Reader)
* *
* @since solr 1.2 * @since solr 1.2
*
* @deprecated Use * @deprecated Use
* {@link #processUpdate(UpdateRequestProcessor, XMLStreamReader)} * {@link XMLLoader#processUpdate(UpdateRequestProcessor, XMLStreamReader)}
* instead. * instead.
*/ */
@Deprecated @Deprecated
@ -405,17 +105,19 @@ public class XmlUpdateRequestHandler extends RequestHandlerBase
SolrCore core = SolrCore.getSolrCore(); SolrCore core = SolrCore.getSolrCore();
// Old style requests do not choose a custom handler // Old style requests do not choose a custom handler
UpdateRequestProcessorChain processorFactory = core.getUpdateProcessingChain( null ); UpdateRequestProcessorChain processorFactory = core.getUpdateProcessingChain(null);
SolrParams params = new MapSolrParams( new HashMap<String, String>() ); SolrParams params = new MapSolrParams(new HashMap<String, String>());
SolrQueryRequestBase req = new SolrQueryRequestBase( core, params ) {}; SolrQueryRequestBase req = new SolrQueryRequestBase(core, params) {
};
SolrQueryResponse rsp = new SolrQueryResponse(); // ignored SolrQueryResponse rsp = new SolrQueryResponse(); // ignored
XMLStreamReader parser = inputFactory.createXMLStreamReader(input); XMLStreamReader parser = inputFactory.createXMLStreamReader(input);
UpdateRequestProcessor processor = processorFactory.createProcessor(req, rsp); UpdateRequestProcessor processor = processorFactory.createProcessor(req, rsp);
this.processUpdate( processor, parser ); XMLLoader loader = (XMLLoader) newLoader(req, processor);
loader.processUpdate(processor, parser);
processor.finish(); processor.finish();
output.write("<result status=\"0\"></result>"); output.write("<result status=\"0\"></result>");
} }
catch (Exception ex) { catch (Exception ex) {
try { try {
SolrException.logOnce(log, "Error processing \"legacy\" update command", ex); SolrException.logOnce(log, "Error processing \"legacy\" update command", ex);
@ -425,7 +127,6 @@ public class XmlUpdateRequestHandler extends RequestHandlerBase
} }
} }
} }
//////////////////////// SolrInfoMBeans methods ////////////////////// //////////////////////// SolrInfoMBeans methods //////////////////////
@Override @Override

View File

@ -47,8 +47,9 @@ public class XmlUpdateRequestHandlerTest extends AbstractSolrTestCase
XMLStreamReader parser = XMLStreamReader parser =
inputFactory.createXMLStreamReader( new StringReader( xml ) ); inputFactory.createXMLStreamReader( new StringReader( xml ) );
parser.next(); // read the START document... parser.next(); // read the START document...
//null for the processor is all right here
SolrInputDocument doc = handler.readDoc( parser ); XMLLoader loader = new XMLLoader(null, inputFactory);
SolrInputDocument doc = loader.readDoc( parser );
// Read boosts // Read boosts
assertEquals( 5.5f, doc.getDocumentBoost() ); assertEquals( 5.5f, doc.getDocumentBoost() );