SOLR-269 -- adding UpdateRequestProcessor as a top level plugin managed by SolrCore. This sets up a default processing chain.

git-svn-id: https://svn.apache.org/repos/asf/lucene/solr/trunk@556064 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Ryan McKinley 2007-07-13 16:48:00 +00:00
parent 4bd34517da
commit b6ca445deb
19 changed files with 847 additions and 216 deletions

View File

@ -73,10 +73,10 @@ New Features
within a single request. For example, sending:
<delete><id>1</id><id>2</id></delete> will delete both 1 and 2. (ryan)
11. SOLR-269: Added UpdateRequestProcessor to the XmlUpdateRequestHandler.
This provides a reasonable place to pre-process documents after they are
parsed and before they are committed to the index. This is a good place
for custom document manipulation or document based authorization. (ryan)
11. SOLR-269: Added UpdateRequestProcessor plugin framework. This provides
a reasonable place to process documents after they are parsed and
before they are committed to the index. This is a good place for custom
document manipulation or document based authorization. (yonik, ryan)
12. SOLR-260: Converting to a standard PluginLoader framework. This reworks
RequestHandlers, FieldTypes, and QueryResponseWriters to share the same

View File

@ -39,4 +39,7 @@ public interface UpdateParams
/** Commit everything after the command completes */
public static String OPTIMIZE = "optimize";
/** Select the update processor to use. A RequestHandler may or may not respect this parameter */
public static final String UPDATE_PROCESSOR = "update.processor";
}

View File

@ -192,7 +192,7 @@ public class Config {
private static final String project = "solr";
private static final String base = "org.apache" + "." + project;
private static final String[] packages = {"","analysis.","schema.","handler.","search.","update.","core.","request.","util."};
private static final String[] packages = {"","analysis.","schema.","handler.","search.","update.","core.","request.","update.processor.","util."};
public static Class findClass(String cname, String... subpackages) {
ClassLoader loader = getClassLoader();

View File

@ -126,7 +126,7 @@ final class RequestHandlers {
{
final RequestHandlers handlers = this;
AbstractPluginLoader<SolrRequestHandler> loader =
new AbstractPluginLoader<SolrRequestHandler>( "[solrconfig.xml] requestHandler", true )
new AbstractPluginLoader<SolrRequestHandler>( "[solrconfig.xml] requestHandler", true, true )
{
@Override
protected SolrRequestHandler create( String name, String className, Node node ) throws Exception

View File

@ -57,7 +57,10 @@ import org.apache.solr.update.DirectUpdateHandler;
import org.apache.solr.update.SolrIndexConfig;
import org.apache.solr.update.SolrIndexWriter;
import org.apache.solr.update.UpdateHandler;
import org.apache.solr.update.processor.ChainedUpdateProcessorFactory;
import org.apache.solr.update.processor.UpdateRequestProcessorFactory;
import org.apache.solr.util.RefCounted;
import org.apache.solr.util.plugin.AbstractPluginLoader;
import org.apache.solr.util.plugin.NamedListPluginLoader;
import org.w3c.dom.Node;
import org.w3c.dom.NodeList;
@ -79,6 +82,7 @@ public final class SolrCore {
private static final long startTime = System.currentTimeMillis();
private final RequestHandlers reqHandlers = new RequestHandlers();
private final SolrHighlighter highlighter;
private final Map<String,UpdateRequestProcessorFactory> updateProcessors;
public long getStartTime() { return startTime; }
@ -209,6 +213,9 @@ public final class SolrCore {
initWriters();
// Processors initialized before the handlers
updateProcessors = loadUpdateProcessors();
reqHandlers.initHandlersFromConfig( SolrConfig.config );
// TODO? could select the highlighter implementation
@ -230,6 +237,51 @@ public final class SolrCore {
}
}
/**
* Load the request processors configured in solrconfig.xml
*/
private Map<String, UpdateRequestProcessorFactory> loadUpdateProcessors() {
final Map<String,UpdateRequestProcessorFactory> map = new HashMap<String, UpdateRequestProcessorFactory>();
// If this is a more general use-case, this could be a regular type
AbstractPluginLoader<UpdateRequestProcessorFactory> loader
= new AbstractPluginLoader<UpdateRequestProcessorFactory>( "updateRequestProcessor" ) {
@Override
protected void init(UpdateRequestProcessorFactory plugin, Node node) throws Exception {
plugin.init( node );
}
@Override
protected UpdateRequestProcessorFactory register(String name, UpdateRequestProcessorFactory plugin) throws Exception {
return map.put( name, plugin );
}
};
NodeList nodes = (NodeList)SolrConfig.config.evaluate("updateRequestProcessor/factory", XPathConstants.NODESET);
UpdateRequestProcessorFactory def = loader.load( nodes );
if( def == null ) {
def = new ChainedUpdateProcessorFactory(); // the default
def.init( null );
}
map.put( null, def );
map.put( "", def );
return map;
}
/**
* @return an update processor registered to the given name. Throw an exception if this factory is undefined
*/
public UpdateRequestProcessorFactory getUpdateProcessorFactory( String name )
{
UpdateRequestProcessorFactory factory = updateProcessors.get( name );
if( factory == null ) {
throw new SolrException( SolrException.ErrorCode.BAD_REQUEST,
"unknown UpdateProcessorFactory: "+name );
}
return factory;
}
public void close() {
log.info("CLOSING SolrCore!");
try {

View File

@ -26,6 +26,7 @@ import org.apache.solr.common.params.UpdateParams;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.request.SolrQueryResponse;
import org.apache.solr.update.CommitUpdateCommand;
import org.apache.solr.update.processor.UpdateRequestProcessor;
/**
* Common helper functions for RequestHandlers
@ -46,7 +47,12 @@ public class RequestHandlerUtils
/**
* Check the request parameters and decide if it should commit or optimize.
* If it does, it will check parameters for "waitFlush" and "waitSearcher"
*
* Use the update processor version
*
* @since solr 1.2
*/
@Deprecated
public static boolean handleCommit( SolrQueryRequest req, SolrQueryResponse rsp, boolean force ) throws IOException
{
SolrParams params = req.getParams();
@ -74,4 +80,28 @@ public class RequestHandlerUtils
}
return false;
}
/**
* Check the request parameters and decide if it should commit or optimize.
* If it does, it will check parameters for "waitFlush" and "waitSearcher"
*/
public static boolean handleCommit( UpdateRequestProcessor processor, SolrParams params, boolean force ) throws IOException
{
if( params == null ) {
params = new MapSolrParams( new HashMap<String, String>() );
}
boolean optimize = params.getBool( UpdateParams.OPTIMIZE, false );
boolean commit = params.getBool( UpdateParams.COMMIT, false );
if( optimize || commit || force ) {
CommitUpdateCommand cmd = new CommitUpdateCommand( optimize );
cmd.waitFlush = params.getBool( UpdateParams.WAIT_FLUSH, cmd.waitFlush );
cmd.waitSearcher = params.getBool( UpdateParams.WAIT_SEARCHER, cmd.waitSearcher );
processor.processCommit( cmd );
return true;
}
return false;
}
}

View File

@ -1,121 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.handler;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.logging.Logger;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.SolrInputField;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.schema.IndexSchema;
import org.apache.solr.schema.SchemaField;
import org.apache.solr.update.AddUpdateCommand;
import org.apache.solr.update.CommitUpdateCommand;
import org.apache.solr.update.DeleteUpdateCommand;
import org.apache.solr.update.DocumentBuilder;
import org.apache.solr.update.UpdateHandler;
/**
* This is a good place for subclassed update handlers to process the document before it is
* indexed. You may wish to add/remove fields or check if the requested user is allowed to
* update the given document...
*
* Perhaps you continue adding an error message (without indexing the document)...
* perhaps you throw an error and halt indexing (remove anything already indexed??)
*
* This implementation (the default) passes the request command (as is) to the updateHandler
* and adds debug info to the response.
*
* @since solr 1.3
*/
public class UpdateRequestProcessor
{
public static Logger log = Logger.getLogger(UpdateRequestProcessor.class.getName());
protected final SolrQueryRequest req;
protected final UpdateHandler updateHandler;
protected final long startTime;
protected final NamedList<Object> response;
// hold on to the added list for logging and the response
protected List<Object> addedIds;
public UpdateRequestProcessor( SolrQueryRequest req )
{
this.req = req;
this.updateHandler = req.getCore().getUpdateHandler();
this.startTime = System.currentTimeMillis();
this.response = new NamedList<Object>();
}
/**
* @return The response information
*/
public NamedList<Object> finish()
{
long elapsed = System.currentTimeMillis() - startTime;
log.info( "update"+response+" 0 " + (elapsed) );
return response;
}
public void processDelete( DeleteUpdateCommand cmd ) throws IOException
{
if( cmd.id != null ) {
updateHandler.delete( cmd );
response.add( "delete", cmd.id );
}
else {
updateHandler.deleteByQuery( cmd );
response.add( "deleteByQuery", cmd.query );
}
}
public void processCommit( CommitUpdateCommand cmd ) throws IOException
{
updateHandler.commit(cmd);
response.add(cmd.optimize ? "optimize" : "commit", "");
}
public void processAdd( AddUpdateCommand cmd, SolrInputDocument doc ) throws IOException
{
// Add a list of added id's to the response
if( addedIds == null ) {
addedIds = new ArrayList<Object>();
response.add( "added", addedIds );
}
IndexSchema schema = req.getSchema();
SchemaField uniqueKeyField = schema.getUniqueKeyField();
Object id = null;
if (uniqueKeyField != null) {
SolrInputField f = doc.getField( uniqueKeyField.getName() );
if( f != null ) {
id = f.getFirstValue();
}
}
addedIds.add( id );
cmd.doc = DocumentBuilder.toDocument( doc, schema );
updateHandler.addDoc(cmd);
}
}

View File

@ -37,11 +37,11 @@ import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.params.MapSolrParams;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.params.UpdateParams;
import org.apache.solr.common.util.ContentStream;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.StrUtils;
import org.apache.solr.common.util.XML;
import org.apache.solr.core.Config;
import org.apache.solr.core.SolrCore;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.request.SolrQueryRequestBase;
@ -49,25 +49,17 @@ import org.apache.solr.request.SolrQueryResponse;
import org.apache.solr.update.AddUpdateCommand;
import org.apache.solr.update.CommitUpdateCommand;
import org.apache.solr.update.DeleteUpdateCommand;
import org.apache.solr.update.processor.UpdateRequestProcessor;
import org.apache.solr.update.processor.UpdateRequestProcessorFactory;
/**
* Add documents to solr using the STAX XML parser.
*
* To change the UpdateRequestProcessor implementation, add the configuration parameter:
*
* <requestHandler name="/update" class="solr.StaxUpdateRequestHandler" >
* <str name="update.processor.class">org.apache.solr.handler.UpdateRequestProcessor</str>
* <lst name="update.processor.args">
* ... (optionally pass in arguments to the factory init method) ...
* </lst>
* </requestHandler>
*/
public class XmlUpdateRequestHandler extends RequestHandlerBase
{
public static Logger log = Logger.getLogger(XmlUpdateRequestHandler.class.getName());
public static final String UPDATE_PROCESSOR_FACTORY = "update.processor.factory";
public static final String UPDATE_PROCESSOR_ARGS = "update.processor.args";
public static final String UPDATE_PROCESSOR = "update.processor";
// XML Constants
public static final String ADD = "add";
@ -76,7 +68,6 @@ public class XmlUpdateRequestHandler extends RequestHandlerBase
public static final String COMMIT = "commit";
public static final String WAIT_SEARCHER = "waitSearcher";
public static final String WAIT_FLUSH = "waitFlush";
public static final String MODE = "mode";
public static final String OVERWRITE = "overwrite";
public static final String OVERWRITE_COMMITTED = "overwriteCommitted"; // @Deprecated
@ -84,7 +75,6 @@ public class XmlUpdateRequestHandler extends RequestHandlerBase
public static final String ALLOW_DUPS = "allowDups";
private XMLInputFactory inputFactory;
private UpdateRequestProcessorFactory processorFactory;
@SuppressWarnings("unchecked")
@Override
@ -92,68 +82,60 @@ public class XmlUpdateRequestHandler extends RequestHandlerBase
{
super.init(args);
inputFactory = BaseXMLInputFactory.newInstance();
// Initialize the UpdateRequestProcessorFactory
NamedList<Object> factoryargs = null;
if( args != null ) {
String className = (String)args.get( UPDATE_PROCESSOR_FACTORY );
factoryargs = (NamedList<Object>)args.get( UPDATE_PROCESSOR_ARGS );
if( className != null ) {
processorFactory = (UpdateRequestProcessorFactory)Config.newInstance( className, new String[]{} );
}
}
if( processorFactory == null ) {
processorFactory = new UpdateRequestProcessorFactory();
}
processorFactory.init( factoryargs );
}
@Override
public void handleRequestBody(SolrQueryRequest req, SolrQueryResponse rsp) throws Exception
{
Iterable<ContentStream> streams = req.getContentStreams();
if( streams == null ) {
if( !RequestHandlerUtils.handleCommit(req, rsp, false) ) {
throw new SolrException( SolrException.ErrorCode.BAD_REQUEST, "missing content stream" );
}
return;
}
RequestHandlerUtils.addExperimentalFormatWarning( rsp );
// Cycle through each stream
for( ContentStream stream : req.getContentStreams() ) {
Reader reader = stream.getReader();
try {
NamedList out = this.update( req, req.getCore(), reader );
rsp.add( "update", out );
}
finally {
IOUtils.closeQuietly(reader);
SolrParams params = req.getParams();
UpdateRequestProcessorFactory processorFactory =
req.getCore().getUpdateProcessorFactory( params.get( UpdateParams.UPDATE_PROCESSOR ) );
UpdateRequestProcessor processor = processorFactory.getInstance(req, rsp, null);
Iterable<ContentStream> streams = req.getContentStreams();
if( streams == null ) {
if( !RequestHandlerUtils.handleCommit(processor, params, false) ) {
throw new SolrException( SolrException.ErrorCode.BAD_REQUEST, "missing content stream" );
}
}
else {
// Cycle through each stream
for( ContentStream stream : req.getContentStreams() ) {
Reader reader = stream.getReader();
try {
XMLStreamReader parser = inputFactory.createXMLStreamReader(reader);
this.processUpdate( processor, parser );
}
finally {
IOUtils.closeQuietly(reader);
}
}
// perhaps commit when we are done
RequestHandlerUtils.handleCommit(req, rsp, false);
// Perhaps commit from the parameters
RequestHandlerUtils.handleCommit( processor, params, false );
}
// finish the request
processor.finish();
}
/**
* @since solr 1.2
*/
NamedList<Object> processUpdate( SolrQueryRequest req, SolrCore core, XMLStreamReader parser)
void processUpdate( UpdateRequestProcessor processor, XMLStreamReader parser)
throws XMLStreamException, IOException, FactoryConfigurationError,
InstantiationException, IllegalAccessException,
TransformerConfigurationException
{
UpdateRequestProcessor processor = processorFactory.getInstance( req );
AddUpdateCommand addCmd = null;
while (true) {
int event = parser.next();
switch (event) {
case XMLStreamConstants.END_DOCUMENT:
parser.close();
return processor.finish();
return;
case XMLStreamConstants.START_ELEMENT:
String currTag = parser.getLocalName();
@ -170,8 +152,6 @@ public class XmlUpdateRequestHandler extends RequestHandlerBase
String attrVal = parser.getAttributeValue(i);
if (OVERWRITE.equals(attrName)) {
overwrite = StrUtils.parseBoolean(attrVal);
// } else if (MODE.equals(attrName)) {
// addCmd.mode = SolrPluginUtils.parseAndValidateFieldModes(attrVal,schema);
} else if (ALLOW_DUPS.equals(attrName)) {
overwrite = !StrUtils.parseBoolean(attrVal);
} else if ( OVERWRITE_PENDING.equals(attrName) ) {
@ -197,9 +177,9 @@ public class XmlUpdateRequestHandler extends RequestHandlerBase
}
else if ("doc".equals(currTag)) {
log.finest("adding doc...");
addCmd.indexedId = null;
SolrInputDocument doc = readDoc( parser );
processor.processAdd( addCmd, doc );
addCmd.clear();
addCmd.solrDoc = readDoc( parser );
processor.processAdd(addCmd);
}
else if ( COMMIT.equals(currTag) || OPTIMIZE.equals(currTag)) {
log.finest("parsing " + currTag);
@ -370,14 +350,6 @@ public class XmlUpdateRequestHandler extends RequestHandlerBase
}
}
/**
* @since solr 1.2
*/
public NamedList<Object> update( SolrQueryRequest req, SolrCore core, Reader reader) throws Exception {
XMLStreamReader parser = inputFactory.createXMLStreamReader(reader);
return processUpdate( req, core, parser);
}
/**
* A Convenience method for getting back a simple XML string indicating
* success or failure from an XML formated Update (from the Reader)
@ -388,9 +360,17 @@ public class XmlUpdateRequestHandler extends RequestHandlerBase
public void doLegacyUpdate(Reader input, Writer output) {
try {
SolrCore core = SolrCore.getSolrCore();
// Old style requests do not choose a custom handler
UpdateRequestProcessorFactory processorFactory = core.getUpdateProcessorFactory( null );
SolrParams params = new MapSolrParams( new HashMap<String, String>() );
SolrQueryRequestBase req = new SolrQueryRequestBase( core, params ) {};
this.update( req, SolrCore.getSolrCore(), input);
SolrQueryResponse rsp = new SolrQueryResponse(); // ignored
XMLStreamReader parser = inputFactory.createXMLStreamReader(input);
UpdateRequestProcessor processor = processorFactory.getInstance(req, rsp, null);
this.processUpdate( processor, parser );
processor.finish();
output.write("<result status=\"0\"></result>");
}
catch (Exception ex) {

View File

@ -18,6 +18,12 @@
package org.apache.solr.update;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.SolrInputField;
import org.apache.solr.schema.IndexSchema;
import org.apache.solr.schema.SchemaField;
/**
* @version $Id$
*/
@ -26,16 +32,80 @@ public class AddUpdateCommand extends UpdateCommand {
// it will be obtained from the doc.
public String indexedId;
// The Lucene document to be indexed
public Document doc;
// Higher level SolrInputDocument, normally used to construct the Lucene Document
// to index.
public SolrInputDocument solrDoc;
public boolean allowDups;
public boolean overwritePending;
public boolean overwriteCommitted;
/** Reset state to reuse this object with a different document in the same request */
public void clear() {
doc = null;
solrDoc = null;
indexedId = null;
}
public SolrInputDocument getSolrInputDocument() {
return solrDoc;
}
public Document getLuceneDocument(IndexSchema schema) {
if (doc == null && solrDoc != null) {
// TODO?? build the doc from the SolrDocument?
}
return doc;
}
public String getIndexedId(IndexSchema schema) {
if (indexedId == null) {
SchemaField sf = schema.getUniqueKeyField();
if (sf != null) {
if (doc != null) {
schema.getUniqueKeyField();
Field storedId = doc.getField(sf.getName());
indexedId = sf.getType().storedToIndexed(storedId);
}
if (solrDoc != null) {
SolrInputField field = solrDoc.getField(sf.getName());
if (field != null) {
indexedId = sf.getType().toInternal( field.getFirstValue().toString() );
}
}
}
}
return indexedId;
}
public String getPrintableId(IndexSchema schema) {
SchemaField sf = schema.getUniqueKeyField();
if (indexedId != null) {
return schema.getUniqueKeyField().getType().indexedToReadable(indexedId);
}
if (doc != null) {
return schema.printableUniqueKey(doc);
}
if (solrDoc != null) {
SolrInputField field = solrDoc.getField(sf.getName());
if (field != null) {
return field.getFirstValue().toString();
}
}
return "(null)";
}
public AddUpdateCommand() {
super("add");
}
public String toString() {
@Override
public String toString() {
StringBuilder sb = new StringBuilder(commandName);
sb.append(':');
if (indexedId !=null) sb.append("id=").append(indexedId);

View File

@ -0,0 +1,102 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.update.processor;
import java.util.ArrayList;
import javax.xml.xpath.XPath;
import javax.xml.xpath.XPathConstants;
import javax.xml.xpath.XPathExpressionException;
import javax.xml.xpath.XPathFactory;
import org.apache.solr.common.SolrException;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.request.SolrQueryResponse;
import org.apache.solr.util.plugin.AbstractPluginLoader;
import org.w3c.dom.Node;
import org.w3c.dom.NodeList;
/**
* An UpdateRequestProcessorFactory that constructs a chain of UpdateRequestProcessor.
*
* This is the default implementation and can be configured via solrconfig.xml with:
*
* <updateRequestProcessor>
* <factory name="standard" class="solr.ChainedUpdateProcessorFactory" >
* <chain class="PathToClass1" />
* <chain class="PathToClass2" />
* <chain class="solr.LogUpdateProcessorFactory" >
* <int name="maxNumToLog">100</int>
* </chain>
* <chain class="solr.RunUpdateProcessorFactory" />
* </factory>
* </updateRequestProcessor>
*
* @since solr 1.3
*/
public class ChainedUpdateProcessorFactory extends UpdateRequestProcessorFactory
{
UpdateRequestProcessorFactory[] factory;
@Override
public void init( Node node ) {
final ArrayList<UpdateRequestProcessorFactory> factories = new ArrayList<UpdateRequestProcessorFactory>();
if( node != null ) {
// Load and initialize the plugin chain
AbstractPluginLoader<UpdateRequestProcessorFactory> loader
= new AbstractPluginLoader<UpdateRequestProcessorFactory>( "processor chain", false, false ) {
@Override
protected void init(UpdateRequestProcessorFactory plugin, Node node) throws Exception {
plugin.init( node );
}
@Override
protected UpdateRequestProcessorFactory register(String name, UpdateRequestProcessorFactory plugin) throws Exception {
factories.add( plugin );
return null;
}
};
XPath xpath = XPathFactory.newInstance().newXPath();
try {
loader.load( (NodeList) xpath.evaluate( "chain", node, XPathConstants.NODESET ) );
}
catch (XPathExpressionException e) {
throw new SolrException( SolrException.ErrorCode.SERVER_ERROR,
"Error loading processor chain: " + node,e,false);
}
}
// If not configured, make sure it has the default settings
if( factories.size() < 1 ) {
factories.add( new RunUpdateProcessorFactory() );
factories.add( new LogUpdateProcessorFactory() );
}
factory = factories.toArray( new UpdateRequestProcessorFactory[factories.size()] );
}
@Override
public UpdateRequestProcessor getInstance(SolrQueryRequest req, SolrQueryResponse rsp, UpdateRequestProcessor next)
{
UpdateRequestProcessor processor = null;
for (int i = factory.length-1; i>=0; i--) {
processor = factory[i].getInstance(req, rsp, processor);
}
return processor;
}
}

View File

@ -0,0 +1,161 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.update.processor;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.logging.Level;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.util.DOMUtil;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.request.SolrQueryResponse;
import org.apache.solr.update.AddUpdateCommand;
import org.apache.solr.update.CommitUpdateCommand;
import org.apache.solr.update.DeleteUpdateCommand;
import org.w3c.dom.Node;
/**
* A logging processor. This keeps track of all commands that have passed through
* the chain and prints them on finish();
*
* If the Log level is not INFO the processor will not be created or added to the chain
*
* @since solr 1.3
*/
public class LogUpdateProcessorFactory extends UpdateRequestProcessorFactory {
int maxNumToLog = 8;
@Override
public void init( Node node ) {
if( node != null ) {
NamedList<Object> args = DOMUtil.childNodesToNamedList( node );
SolrParams params = SolrParams.toSolrParams( args );
maxNumToLog = params.getInt( "maxNumToLog", maxNumToLog );
}
}
@Override
public UpdateRequestProcessor getInstance(SolrQueryRequest req, SolrQueryResponse rsp, UpdateRequestProcessor next) {
boolean doLog = LogUpdateProcessor.log.isLoggable(Level.INFO);
LogUpdateProcessor.log.severe("Will Log=" + doLog);
if( doLog ) {
// only create the log processor if we will use it
return new LogUpdateProcessor(req, rsp, this, next);
}
return null;
}
}
class LogUpdateProcessor extends UpdateRequestProcessor {
private final SolrQueryRequest req;
private final SolrQueryResponse rsp;
private final UpdateRequestProcessor next;
private final NamedList<Object> toLog;
int numAdds;
int numDeletes;
// hold on to the added list for logging and the response
private List<String> adds;
private List<String> deletes;
private final int maxNumToLog;
public LogUpdateProcessor(SolrQueryRequest req, SolrQueryResponse rsp, LogUpdateProcessorFactory factory, UpdateRequestProcessor next) {
this.req = req;
this.rsp = rsp;
this.next = next;
maxNumToLog = factory.maxNumToLog; // TODO: make configurable
// TODO: make log level configurable as well, or is that overkill?
// (ryan) maybe? I added it mostly to show that it *can* be configurable
this.toLog = new NamedList<Object>();
}
@Override
public void processAdd(AddUpdateCommand cmd) throws IOException {
if (next != null) next.processAdd(cmd);
// Add a list of added id's to the response
if (adds == null) {
adds = new ArrayList<String>();
toLog.add("add",adds);
}
if (adds.size() < maxNumToLog) {
adds.add(cmd.getPrintableId(req.getSchema()));
}
numAdds++;
}
@Override
public void processDelete( DeleteUpdateCommand cmd ) throws IOException {
if (next != null) next.processDelete(cmd);
if (cmd.id != null) {
if (deletes == null) {
deletes = new ArrayList<String>();
toLog.add("delete",deletes);
}
if (deletes.size() < maxNumToLog) {
deletes.add(cmd.id);
}
} else {
if (toLog.size() < maxNumToLog) {
toLog.add("deleteByQuery", cmd.query);
}
}
numDeletes++;
}
@Override
public void processCommit( CommitUpdateCommand cmd ) throws IOException {
if (next != null) next.processCommit(cmd);
toLog.add(cmd.optimize ? "optimize" : "commit", "");
}
@Override
public void finish() throws IOException {
if (next != null) next.finish();
// TODO: right now, update requests are logged twice...
// this will slow down things compared to Solr 1.2
// we should have extra log info on the SolrQueryResponse, to
// be logged by SolrCore
// if id lists were truncated, show how many more there were
if (numAdds > maxNumToLog) {
adds.add("...(" + (numAdds-adds.size()) + " more)");
}
if (numDeletes > maxNumToLog) {
deletes.add("...(" + (numDeletes-deletes.size()) + " more)");
}
long elapsed = rsp.getEndTime() - req.getStartTime();
log.info( ""+toLog + " 0 " + (elapsed) );
}
}

View File

@ -0,0 +1,63 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.update.processor;
import java.io.IOException;
import org.apache.solr.update.AddUpdateCommand;
import org.apache.solr.update.CommitUpdateCommand;
import org.apache.solr.update.DeleteUpdateCommand;
/**
* A simple processor that does nothing but passes on the command to the next
* processor in the chain.
*
* @since solr 1.3
*/
public abstract class NoOpUpdateProcessor extends UpdateRequestProcessor
{
protected final UpdateRequestProcessor next;
public NoOpUpdateProcessor( UpdateRequestProcessor next) {
this.next = next;
}
@Override
public void processAdd(AddUpdateCommand cmd) throws IOException {
if (next != null) next.processAdd(cmd);
}
@Override
public void processDelete(DeleteUpdateCommand cmd) throws IOException {
if (next != null) next.processDelete(cmd);
}
@Override
public void processCommit(CommitUpdateCommand cmd) throws IOException
{
if (next != null) next.processCommit(cmd);
}
@Override
public void finish() throws IOException {
if (next != null) next.finish();
}
}

View File

@ -0,0 +1,89 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.update.processor;
import java.io.IOException;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.request.SolrQueryResponse;
import org.apache.solr.update.AddUpdateCommand;
import org.apache.solr.update.CommitUpdateCommand;
import org.apache.solr.update.DeleteUpdateCommand;
import org.apache.solr.update.DocumentBuilder;
import org.apache.solr.update.UpdateHandler;
import org.w3c.dom.Node;
/**
* Pass the command to the UpdateHandler without any modifications
*
* @since solr 1.3
*/
public class RunUpdateProcessorFactory extends UpdateRequestProcessorFactory
{
@Override
public void init( Node node ) {
}
@Override
public UpdateRequestProcessor getInstance(SolrQueryRequest req, SolrQueryResponse rsp, UpdateRequestProcessor next)
{
return new RunUpdateProcessor(req, next);
}
}
class RunUpdateProcessor extends NoOpUpdateProcessor
{
private final SolrQueryRequest req;
private final UpdateHandler updateHandler;
public RunUpdateProcessor(SolrQueryRequest req, UpdateRequestProcessor next) {
super( next );
this.req = req;
this.updateHandler = req.getCore().getUpdateHandler();
}
@Override
public void processAdd(AddUpdateCommand cmd) throws IOException {
cmd.doc = DocumentBuilder.toDocument(cmd.getSolrInputDocument(), req.getSchema());
updateHandler.addDoc(cmd);
super.processAdd(cmd);
}
@Override
public void processDelete(DeleteUpdateCommand cmd) throws IOException {
if( cmd.id != null ) {
updateHandler.delete(cmd);
}
else {
updateHandler.deleteByQuery(cmd);
}
super.processDelete(cmd);
}
@Override
public void processCommit(CommitUpdateCommand cmd) throws IOException
{
updateHandler.commit(cmd);
super.processCommit(cmd);
}
}

View File

@ -0,0 +1,49 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.update.processor;
import java.io.IOException;
import java.util.logging.Logger;
import org.apache.solr.update.AddUpdateCommand;
import org.apache.solr.update.CommitUpdateCommand;
import org.apache.solr.update.DeleteUpdateCommand;
/**
* This is a good place for subclassed update handlers to process the document before it is
* indexed. You may wish to add/remove fields or check if the requested user is allowed to
* update the given document...
*
* Perhaps you continue adding an error message (without indexing the document)...
* perhaps you throw an error and halt indexing (remove anything already indexed??)
*
* This implementation (the default) passes the request command (as is) to the updateHandler
* and adds debug info to the response.
*
* @since solr 1.3
*/
public abstract class UpdateRequestProcessor {
protected static Logger log = Logger.getLogger(UpdateRequestProcessor.class.getName());
public abstract void processAdd(AddUpdateCommand cmd) throws IOException;
public abstract void processDelete(DeleteUpdateCommand cmd) throws IOException;
public abstract void processCommit(CommitUpdateCommand cmd) throws IOException;
public abstract void finish() throws IOException;
}

View File

@ -15,32 +15,24 @@
* limitations under the License.
*/
package org.apache.solr.handler;
package org.apache.solr.update.processor;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.request.SolrQueryResponse;
import org.w3c.dom.Node;
/**
* A factory to generate UpdateRequestProcessors for each request. The default
* implementation does nothing except pass the commands directly to the
* UpdateHandler
* A factory to generate UpdateRequestProcessors for each request.
*
* @since solr 1.3
*/
public class UpdateRequestProcessorFactory
public abstract class UpdateRequestProcessorFactory
{
public UpdateRequestProcessorFactory()
public void init( Node node )
{
// could process the Node
}
public void init( NamedList<Object> args )
{
// by default nothing...
}
public UpdateRequestProcessor getInstance( SolrQueryRequest req )
{
return new UpdateRequestProcessor( req );
}
abstract public UpdateRequestProcessor getInstance(
SolrQueryRequest req, SolrQueryResponse rsp, UpdateRequestProcessor next );
}

View File

@ -40,20 +40,22 @@ public abstract class AbstractPluginLoader<T>
private final String type;
private final boolean preRegister;
private final boolean requireName;
/**
* @param type is the 'type' name included in error messages.
* @param preRegister, if true, this will first register all Plugins, then it will initialize them.
*/
public AbstractPluginLoader( String type, boolean preRegister )
public AbstractPluginLoader( String type, boolean preRegister, boolean requireName )
{
this.type = type;
this.preRegister = preRegister;
this.requireName = requireName;
}
public AbstractPluginLoader( String type )
{
this( type, false );
this( type, false, true );
}
/**
@ -130,7 +132,7 @@ public abstract class AbstractPluginLoader<T>
// In a production environment, we can tolerate an error in some request handlers,
// still load the others, and have a working system.
try {
String name = DOMUtil.getAttr(node,"name", type);
String name = DOMUtil.getAttr(node,"name", requireName?type:null);
String className = DOMUtil.getAttr(node,"class", type);
String defaultStr = DOMUtil.getAttr(node,"default", null );

View File

@ -0,0 +1,50 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.update.processor;
import org.apache.solr.common.util.DOMUtil;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.request.SolrQueryResponse;
import org.apache.solr.update.processor.UpdateRequestProcessor;
import org.apache.solr.update.processor.UpdateRequestProcessorFactory;
import org.w3c.dom.Node;
/**
* A custom class to do custom stuff
*/
public class CustomUpdateRequestProcessorFactory extends UpdateRequestProcessorFactory
{
public NamedList args = null;
@Override
public void init( Node node )
{
if( node != null ) {
args = DOMUtil.childNodesToNamedList( node );
}
}
@Override
public UpdateRequestProcessor getInstance(SolrQueryRequest req, SolrQueryResponse rsp, UpdateRequestProcessor next) {
// TODO Auto-generated method stub
return null;
}
}

View File

@ -0,0 +1,56 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.update.processor;
import org.apache.solr.core.SolrCore;
import org.apache.solr.update.processor.ChainedUpdateProcessorFactory;
import org.apache.solr.util.AbstractSolrTestCase;
/**
*
*/
public class UpdateRequestProcessorFactoryTest extends AbstractSolrTestCase {
@Override public String getSchemaFile() { return "schema.xml"; }
@Override public String getSolrConfigFile() { return "solrconfig-transformers.xml"; }
public void testConfiguration() throws Exception
{
SolrCore core = SolrCore.getSolrCore();
// make sure it loaded the factories
ChainedUpdateProcessorFactory chained =
(ChainedUpdateProcessorFactory)core.getUpdateProcessorFactory( "standard" );
// Make sure it got 3 items and configured the Log factory ok
assertEquals( 3, chained.factory.length );
LogUpdateProcessorFactory log = (LogUpdateProcessorFactory)chained.factory[0];
assertEquals( 100, log.maxNumToLog );
CustomUpdateRequestProcessorFactory custom =
(CustomUpdateRequestProcessorFactory)core.getUpdateProcessorFactory( null );
assertEquals( custom, core.getUpdateProcessorFactory( "" ) );
assertEquals( custom, core.getUpdateProcessorFactory( "custom" ) );
// Make sure the NamedListArgs got through ok
assertEquals( "{name={n8=88,n9=99}}", custom.args.toString() );
}
}

View File

@ -0,0 +1,53 @@
<?xml version="1.0" ?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<config>
<requestHandler name="custom" class="solr.CustomUpdateRequestHandler" >
<str name="update.processor.factory">custom</str>
</requestHandler>
<updateRequestProcessor>
<factory name="standard" class="solr.ChainedUpdateProcessorFactory" >
<chain class="solr.LogUpdateProcessorFactory" >
<int name="maxNumToLog">100</int>
</chain>
<chain class="solr.CustomUpdateRequestProcessorFactory" >
<lst name="name">
<str name="n1">x1</str>
<str name="n2">x2</str>
</lst>
</chain>
<chain class="solr.CustomUpdateRequestProcessorFactory" >
<lst name="name">
<str name="nA">xA</str>
<str name="nA">xA</str>
</lst>
</chain>
</factory>
<factory name="custom" class="solr.CustomUpdateRequestProcessorFactory" default="true" >
<lst name="name">
<str name="n8">88</str>
<str name="n9">99</str>
</lst>
</factory>
</updateRequestProcessor>
</config>