SOLR-670 -- Add support for rollbacks in UpdateHandler.

git-svn-id: https://svn.apache.org/repos/asf/lucene/solr/trunk@704853 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Shalin Shekhar Mangar 2008-10-15 10:38:56 +00:00
parent fc15f6f3a6
commit eb6e94c679
15 changed files with 364 additions and 8 deletions

View File

@ -53,6 +53,9 @@ New Features
similar to the global autoCommit maxTime argument except that it is set for similar to the global autoCommit maxTime argument except that it is set for
each request. (ryan) each request. (ryan)
6. SOLR-670: Add support for rollbacks in UpdateHandler. This allows user to rollback all changes
since the last commit. (Noble Paul, koji via shalin)
Optimizations Optimizations
---------------------- ----------------------

View File

@ -93,6 +93,10 @@ public abstract class SolrServer implements Serializable
return new UpdateRequest().setAction( UpdateRequest.ACTION.OPTIMIZE, waitFlush, waitSearcher, maxSegments ).process( this ); return new UpdateRequest().setAction( UpdateRequest.ACTION.OPTIMIZE, waitFlush, waitSearcher, maxSegments ).process( this );
} }
public UpdateResponse rollback() throws SolrServerException, IOException {
return new UpdateRequest().rollback().process( this );
}
public UpdateResponse deleteById(String id) throws SolrServerException, IOException { public UpdateResponse deleteById(String id) throws SolrServerException, IOException {
return new UpdateRequest().deleteById( id ).process( this ); return new UpdateRequest().deleteById( id ).process( this );
} }

View File

@ -141,6 +141,17 @@ public class UpdateRequest extends SolrRequest
return this; return this;
} }
/**
* @since Solr 1.4
*/
public UpdateRequest rollback() {
if (params == null)
params = new ModifiableSolrParams();
params.set( UpdateParams.ROLLBACK, "true" );
return this;
}
public void setParam(String param, String value) { public void setParam(String param, String value) {
if (params == null) if (params == null)

View File

@ -37,9 +37,12 @@ public interface UpdateParams
/** Commit everything after the command completes */ /** Commit everything after the command completes */
public static String COMMIT = "commit"; public static String COMMIT = "commit";
/** Commit everything after the command completes */ /** Optimize the index and commit everything after the command completes */
public static String OPTIMIZE = "optimize"; public static String OPTIMIZE = "optimize";
/** Rollback update commands */
public static String ROLLBACK = "rollback";
/** Select the update processor to use. A RequestHandler may or may not respect this parameter */ /** Select the update processor to use. A RequestHandler may or may not respect this parameter */
public static final String UPDATE_PROCESSOR = "update.processor"; public static final String UPDATE_PROCESSOR = "update.processor";
/** /**

View File

@ -59,8 +59,8 @@ public class CSVRequestHandler extends RequestHandlerBase {
Iterable<ContentStream> streams = req.getContentStreams(); Iterable<ContentStream> streams = req.getContentStreams();
if( streams == null ) { if( streams == null ) {
if( !RequestHandlerUtils.handleCommit(processor, params, false) ) { if (!RequestHandlerUtils.handleCommit(processor, params, false) && !RequestHandlerUtils.handleRollback(processor, params, false)) {
throw new SolrException( SolrException.ErrorCode.BAD_REQUEST, "missing content stream" ); throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "missing content stream");
} }
} }
else { else {
@ -77,6 +77,7 @@ public class CSVRequestHandler extends RequestHandlerBase {
// Perhaps commit from the parameters // Perhaps commit from the parameters
RequestHandlerUtils.handleCommit( processor, params, false ); RequestHandlerUtils.handleCommit( processor, params, false );
RequestHandlerUtils.handleRollback(processor, params, false );
} }
} finally { } finally {
// finish the request // finish the request

View File

@ -26,6 +26,7 @@ import org.apache.solr.common.params.UpdateParams;
import org.apache.solr.request.SolrQueryRequest; import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.request.SolrQueryResponse; import org.apache.solr.request.SolrQueryResponse;
import org.apache.solr.update.CommitUpdateCommand; import org.apache.solr.update.CommitUpdateCommand;
import org.apache.solr.update.RollbackUpdateCommand;
import org.apache.solr.update.processor.UpdateRequestProcessor; import org.apache.solr.update.processor.UpdateRequestProcessor;
/** /**
@ -106,4 +107,23 @@ public class RequestHandlerUtils
} }
return false; return false;
} }
/**
* @since Solr 1.4
*/
public static boolean handleRollback( UpdateRequestProcessor processor, SolrParams params, boolean force ) throws IOException
{
if( params == null ) {
params = new MapSolrParams( new HashMap<String, String>() );
}
boolean rollback = params.getBool( UpdateParams.ROLLBACK, false );
if( rollback || force ) {
RollbackUpdateCommand cmd = new RollbackUpdateCommand();
processor.processRollback( cmd );
return true;
}
return false;
}
} }

View File

@ -52,6 +52,7 @@ import org.apache.solr.request.SolrQueryResponse;
import org.apache.solr.update.AddUpdateCommand; import org.apache.solr.update.AddUpdateCommand;
import org.apache.solr.update.CommitUpdateCommand; import org.apache.solr.update.CommitUpdateCommand;
import org.apache.solr.update.DeleteUpdateCommand; import org.apache.solr.update.DeleteUpdateCommand;
import org.apache.solr.update.RollbackUpdateCommand;
import org.apache.solr.update.processor.UpdateRequestProcessorChain; import org.apache.solr.update.processor.UpdateRequestProcessorChain;
import org.apache.solr.update.processor.UpdateRequestProcessor; import org.apache.solr.update.processor.UpdateRequestProcessor;
@ -69,6 +70,7 @@ public class XmlUpdateRequestHandler extends RequestHandlerBase
public static final String DELETE = "delete"; public static final String DELETE = "delete";
public static final String OPTIMIZE = "optimize"; public static final String OPTIMIZE = "optimize";
public static final String COMMIT = "commit"; public static final String COMMIT = "commit";
public static final String ROLLBACK = "rollback";
public static final String WAIT_SEARCHER = "waitSearcher"; public static final String WAIT_SEARCHER = "waitSearcher";
public static final String WAIT_FLUSH = "waitFlush"; public static final String WAIT_FLUSH = "waitFlush";
@ -113,8 +115,8 @@ public class XmlUpdateRequestHandler extends RequestHandlerBase
UpdateRequestProcessor processor = processingChain.createProcessor(req, rsp); UpdateRequestProcessor processor = processingChain.createProcessor(req, rsp);
Iterable<ContentStream> streams = req.getContentStreams(); Iterable<ContentStream> streams = req.getContentStreams();
if( streams == null ) { if( streams == null ) {
if( !RequestHandlerUtils.handleCommit(processor, params, false) ) { if (!RequestHandlerUtils.handleCommit(processor, params, false) && !RequestHandlerUtils.handleRollback(processor, params, false)) {
throw new SolrException( SolrException.ErrorCode.BAD_REQUEST, "missing content stream" ); throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "missing content stream");
} }
} }
else { else {
@ -138,6 +140,8 @@ public class XmlUpdateRequestHandler extends RequestHandlerBase
// Perhaps commit from the parameters // Perhaps commit from the parameters
RequestHandlerUtils.handleCommit( processor, params, false ); RequestHandlerUtils.handleCommit( processor, params, false );
// Perhaps rollback from the parameters
RequestHandlerUtils.handleRollback( processor, params, false );
} }
// finish the request // finish the request
@ -236,6 +240,13 @@ public class XmlUpdateRequestHandler extends RequestHandlerBase
} }
processor.processCommit( cmd ); processor.processCommit( cmd );
} // end commit } // end commit
else if ( ROLLBACK.equals(currTag) ) {
log.trace("parsing " + currTag);
RollbackUpdateCommand cmd = new RollbackUpdateCommand();
processor.processRollback( cmd );
} // end rollback
else if (DELETE.equals(currTag)) { else if (DELETE.equals(currTag)) {
log.trace("parsing delete"); log.trace("parsing delete");
processDelete( processor, parser); processDelete( processor, parser);

View File

@ -188,7 +188,7 @@ public class DirectUpdateHandler extends UpdateHandler {
if (!cmd.fromPending && !cmd.fromCommitted) if (!cmd.fromPending && !cmd.fromCommitted)
throw new SolrException( SolrException.ErrorCode.BAD_REQUEST,"meaningless command: " + cmd); throw new SolrException( SolrException.ErrorCode.BAD_REQUEST,"meaningless command: " + cmd);
if (!cmd.fromPending || !cmd.fromCommitted) if (!cmd.fromPending || !cmd.fromCommitted)
throw new SolrException( SolrException.ErrorCode.BAD_REQUEST,"operation not supported" + cmd); throw new SolrException( SolrException.ErrorCode.BAD_REQUEST,"operation not supported: " + cmd);
Query q = QueryParsing.parseQuery(cmd.query, schema); Query q = QueryParsing.parseQuery(cmd.query, schema);
@ -262,6 +262,13 @@ public class DirectUpdateHandler extends UpdateHandler {
return; return;
} }
/**
* @since Solr 1.4
*/
public void rollback(RollbackUpdateCommand cmd) throws IOException {
throw new SolrException( SolrException.ErrorCode.BAD_REQUEST,
"DirectUpdateHandler doesn't support rollback. Use DirectUpdateHandler2 instead.");
}
/////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////

View File

@ -125,6 +125,7 @@ public class DirectUpdateHandler2 extends UpdateHandler {
AtomicLong deleteByQueryCommandsCumulative= new AtomicLong(); AtomicLong deleteByQueryCommandsCumulative= new AtomicLong();
AtomicLong commitCommands= new AtomicLong(); AtomicLong commitCommands= new AtomicLong();
AtomicLong optimizeCommands= new AtomicLong(); AtomicLong optimizeCommands= new AtomicLong();
AtomicLong rollbackCommands= new AtomicLong();
AtomicLong numDocsPending= new AtomicLong(); AtomicLong numDocsPending= new AtomicLong();
AtomicLong numErrors = new AtomicLong(); AtomicLong numErrors = new AtomicLong();
AtomicLong numErrorsCumulative = new AtomicLong(); AtomicLong numErrorsCumulative = new AtomicLong();
@ -176,6 +177,12 @@ public class DirectUpdateHandler2 extends UpdateHandler {
} }
} }
// must only be called when iwCommit lock held
protected void rollbackWriter() throws IOException {
numDocsPending.set(0);
if (writer!=null) writer.rollback();
}
public int addDoc(AddUpdateCommand cmd) throws IOException { public int addDoc(AddUpdateCommand cmd) throws IOException {
addCommands.incrementAndGet(); addCommands.incrementAndGet();
addCommandsCumulative.incrementAndGet(); addCommandsCumulative.incrementAndGet();
@ -370,6 +377,38 @@ public class DirectUpdateHandler2 extends UpdateHandler {
} }
} }
/**
* @since Solr 1.4
*/
public void rollback(RollbackUpdateCommand cmd) throws IOException {
rollbackCommands.incrementAndGet();
boolean error=true;
iwCommit.lock();
try {
log.info("start "+cmd);
rollbackWriter();
//callPostRollbackCallbacks();
// reset commit tracking
tracker.didRollback();
log.info("end_rollback");
error=false;
}
finally {
iwCommit.unlock();
addCommands.set(0);
deleteByIdCommands.set(0);
deleteByQueryCommands.set(0);
numErrors.set(error ? 1 : 0);
}
}
public void close() throws IOException { public void close() throws IOException {
log.info("closing " + this); log.info("closing " + this);
@ -467,6 +506,15 @@ public class DirectUpdateHandler2 extends UpdateHandler {
docsSinceCommit = 0; docsSinceCommit = 0;
} }
/** Inform tracker that a rollback has occurred, cancel any pending commits */
public void didRollback() {
if( pending != null ) {
pending.cancel(false);
pending = null; // let it start another one
}
docsSinceCommit = 0;
}
/** This is the worker part for the ScheduledFuture **/ /** This is the worker part for the ScheduledFuture **/
public synchronized void run() { public synchronized void run() {
long started = System.currentTimeMillis(); long started = System.currentTimeMillis();
@ -556,6 +604,7 @@ public class DirectUpdateHandler2 extends UpdateHandler {
} }
lst.add("autocommits", tracker.autoCommitCount); lst.add("autocommits", tracker.autoCommitCount);
lst.add("optimizes", optimizeCommands.get()); lst.add("optimizes", optimizeCommands.get());
lst.add("rollbacks", rollbackCommands.get());
lst.add("docsPending", numDocsPending.get()); lst.add("docsPending", numDocsPending.get());
// pset.size() not synchronized, but it should be fine to access. // pset.size() not synchronized, but it should be fine to access.
// lst.add("deletesPending", pset.size()); // lst.add("deletesPending", pset.size());

View File

@ -0,0 +1,30 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.update;
/**
* @version $Id$
* @since Solr 1.4
*/
public class RollbackUpdateCommand extends UpdateCommand {
public RollbackUpdateCommand() {
super("rollback");
}
}

View File

@ -156,6 +156,7 @@ public abstract class UpdateHandler implements SolrInfoMBean {
public abstract void delete(DeleteUpdateCommand cmd) throws IOException; public abstract void delete(DeleteUpdateCommand cmd) throws IOException;
public abstract void deleteByQuery(DeleteUpdateCommand cmd) throws IOException; public abstract void deleteByQuery(DeleteUpdateCommand cmd) throws IOException;
public abstract void commit(CommitUpdateCommand cmd) throws IOException; public abstract void commit(CommitUpdateCommand cmd) throws IOException;
public abstract void rollback(RollbackUpdateCommand cmd) throws IOException;
public abstract void close() throws IOException; public abstract void close() throws IOException;

View File

@ -29,6 +29,7 @@ import org.apache.solr.request.SolrQueryResponse;
import org.apache.solr.update.AddUpdateCommand; import org.apache.solr.update.AddUpdateCommand;
import org.apache.solr.update.CommitUpdateCommand; import org.apache.solr.update.CommitUpdateCommand;
import org.apache.solr.update.DeleteUpdateCommand; import org.apache.solr.update.DeleteUpdateCommand;
import org.apache.solr.update.RollbackUpdateCommand;
/** /**
* A logging processor. This keeps track of all commands that have passed through * A logging processor. This keeps track of all commands that have passed through
@ -131,6 +132,16 @@ class LogUpdateProcessor extends UpdateRequestProcessor {
toLog.add(cmd.optimize ? "optimize" : "commit", ""); toLog.add(cmd.optimize ? "optimize" : "commit", "");
} }
/**
* @since Solr 1.4
*/
@Override
public void processRollback( RollbackUpdateCommand cmd ) throws IOException {
if (next != null) next.processRollback(cmd);
toLog.add("rollback", "");
}
@Override @Override
public void finish() throws IOException { public void finish() throws IOException {

View File

@ -25,6 +25,7 @@ import org.apache.solr.update.AddUpdateCommand;
import org.apache.solr.update.CommitUpdateCommand; import org.apache.solr.update.CommitUpdateCommand;
import org.apache.solr.update.DeleteUpdateCommand; import org.apache.solr.update.DeleteUpdateCommand;
import org.apache.solr.update.DocumentBuilder; import org.apache.solr.update.DocumentBuilder;
import org.apache.solr.update.RollbackUpdateCommand;
import org.apache.solr.update.UpdateHandler; import org.apache.solr.update.UpdateHandler;
@ -77,6 +78,16 @@ class RunUpdateProcessor extends UpdateRequestProcessor
updateHandler.commit(cmd); updateHandler.commit(cmd);
super.processCommit(cmd); super.processCommit(cmd);
} }
/**
* @since Solr 1.4
*/
@Override
public void processRollback(RollbackUpdateCommand cmd) throws IOException
{
updateHandler.rollback(cmd);
super.processRollback(cmd);
}
} }

View File

@ -24,6 +24,7 @@ import org.slf4j.LoggerFactory;
import org.apache.solr.update.AddUpdateCommand; import org.apache.solr.update.AddUpdateCommand;
import org.apache.solr.update.CommitUpdateCommand; import org.apache.solr.update.CommitUpdateCommand;
import org.apache.solr.update.DeleteUpdateCommand; import org.apache.solr.update.DeleteUpdateCommand;
import org.apache.solr.update.RollbackUpdateCommand;
/** /**
@ -60,6 +61,14 @@ public abstract class UpdateRequestProcessor {
if (next != null) next.processCommit(cmd); if (next != null) next.processCommit(cmd);
} }
/**
* @since Solr 1.4
*/
public void processRollback(RollbackUpdateCommand cmd) throws IOException
{
if (next != null) next.processRollback(cmd);
}
public void finish() throws IOException { public void finish() throws IOException {
if (next != null) next.finish(); if (next != null) next.finish();
} }

View File

@ -17,12 +17,19 @@
package org.apache.solr.update; package org.apache.solr.update;
import java.util.HashMap;
import java.util.Map;
import org.apache.lucene.document.Document; import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field; import org.apache.lucene.document.Field;
import org.apache.lucene.document.Field.Index; import org.apache.lucene.document.Field.Index;
import org.apache.lucene.document.Field.Store; import org.apache.lucene.document.Field.Store;
import org.apache.solr.common.SolrException; import org.apache.solr.common.SolrException;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.MapSolrParams;
import org.apache.solr.core.SolrCore; import org.apache.solr.core.SolrCore;
import org.apache.solr.request.LocalSolrQueryRequest;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.util.AbstractSolrTestCase; import org.apache.solr.util.AbstractSolrTestCase;
/** /**
@ -75,4 +82,182 @@ public class DirectUpdateHandlerTest extends AbstractSolrTestCase {
catch( SolrException ex ) { } // expected catch( SolrException ex ) { } // expected
} }
public void testUncommit() throws Exception {
addSimpleDoc("A");
// search - not committed - "A" should not be found.
Map<String,String> args = new HashMap<String, String>();
args.put( CommonParams.Q, "id:A" );
args.put( "indent", "true" );
SolrQueryRequest req = new LocalSolrQueryRequest( h.getCore(), new MapSolrParams( args) );
assertQ("\"A\" should not be found.", req
,"//*[@numFound='0']"
);
}
public void testAddCommit() throws Exception {
addSimpleDoc("A");
// commit "A"
SolrCore core = h.getCore();
UpdateHandler updater = core.getUpdateHandler();
CommitUpdateCommand cmtCmd = new CommitUpdateCommand(false);
cmtCmd.waitSearcher = true;
updater.commit(cmtCmd);
// search - "A" should be found.
Map<String,String> args = new HashMap<String, String>();
args.put( CommonParams.Q, "id:A" );
args.put( "indent", "true" );
SolrQueryRequest req = new LocalSolrQueryRequest( core, new MapSolrParams( args) );
assertQ("\"A\" should be found.", req
,"//*[@numFound='1']"
,"//result/doc[1]/int[@name='id'][.='A']"
);
}
public void testDeleteCommit() throws Exception {
addSimpleDoc("A");
addSimpleDoc("B");
// commit "A", "B"
SolrCore core = h.getCore();
UpdateHandler updater = core.getUpdateHandler();
CommitUpdateCommand cmtCmd = new CommitUpdateCommand(false);
cmtCmd.waitSearcher = true;
updater.commit(cmtCmd);
// search - "A","B" should be found.
Map<String,String> args = new HashMap<String, String>();
args.put( CommonParams.Q, "id:A OR id:B" );
args.put( "indent", "true" );
SolrQueryRequest req = new LocalSolrQueryRequest( core, new MapSolrParams( args) );
assertQ("\"A\" and \"B\" should be found.", req
,"//*[@numFound='2']"
,"//result/doc[1]/int[@name='id'][.='A']"
,"//result/doc[2]/int[@name='id'][.='B']"
);
// delete "B"
deleteSimpleDoc("B");
// search - "A","B" should be found.
assertQ("\"A\" and \"B\" should be found.", req
,"//*[@numFound='2']"
,"//result/doc[1]/int[@name='id'][.='A']"
,"//result/doc[2]/int[@name='id'][.='B']"
);
// commit
updater.commit(cmtCmd);
// search - "B" should not be found.
assertQ("\"B\" should not be found.", req
,"//*[@numFound='1']"
,"//result/doc[1]/int[@name='id'][.='A']"
);
}
public void testAddRollback() throws Exception {
addSimpleDoc("A");
// commit "A"
SolrCore core = h.getCore();
UpdateHandler updater = core.getUpdateHandler();
CommitUpdateCommand cmtCmd = new CommitUpdateCommand(false);
cmtCmd.waitSearcher = true;
updater.commit(cmtCmd);
addSimpleDoc("B");
// rollback "B"
RollbackUpdateCommand rbkCmd = new RollbackUpdateCommand();
updater.rollback(rbkCmd);
updater.commit(cmtCmd);
// search - "B" should not be found.
Map<String,String> args = new HashMap<String, String>();
args.put( CommonParams.Q, "id:A OR id:B" );
args.put( "indent", "true" );
SolrQueryRequest req = new LocalSolrQueryRequest( core, new MapSolrParams( args) );
assertQ("\"B\" should not be found.", req
,"//*[@numFound='1']"
,"//result/doc[1]/int[@name='id'][.='A']"
);
}
public void testDeleteRollback() throws Exception {
addSimpleDoc("A");
addSimpleDoc("B");
// commit "A", "B"
SolrCore core = h.getCore();
UpdateHandler updater = core.getUpdateHandler();
CommitUpdateCommand cmtCmd = new CommitUpdateCommand(false);
cmtCmd.waitSearcher = true;
updater.commit(cmtCmd);
// search - "A","B" should be found.
Map<String,String> args = new HashMap<String, String>();
args.put( CommonParams.Q, "id:A OR id:B" );
args.put( "indent", "true" );
SolrQueryRequest req = new LocalSolrQueryRequest( core, new MapSolrParams( args) );
assertQ("\"A\" and \"B\" should be found.", req
,"//*[@numFound='2']"
,"//result/doc[1]/int[@name='id'][.='A']"
,"//result/doc[2]/int[@name='id'][.='B']"
);
// delete "B"
deleteSimpleDoc("B");
// search - "A","B" should be found.
assertQ("\"A\" and \"B\" should be found.", req
,"//*[@numFound='2']"
,"//result/doc[1]/int[@name='id'][.='A']"
,"//result/doc[2]/int[@name='id'][.='B']"
);
// rollback "B"
RollbackUpdateCommand rbkCmd = new RollbackUpdateCommand();
updater.rollback(rbkCmd);
updater.commit(cmtCmd);
// search - "B" should be found.
assertQ("\"B\" should be found.", req
,"//*[@numFound='2']"
,"//result/doc[1]/int[@name='id'][.='A']"
,"//result/doc[2]/int[@name='id'][.='B']"
);
}
private void addSimpleDoc(String id) throws Exception {
SolrCore core = h.getCore();
UpdateHandler updater = core.getUpdateHandler();
AddUpdateCommand cmd = new AddUpdateCommand();
cmd.overwriteCommitted = true;
cmd.overwritePending = true;
cmd.allowDups = false;
// Add a document
cmd.doc = new Document();
cmd.doc.add( new Field( "id", id, Store.YES, Index.UN_TOKENIZED ) );
updater.addDoc( cmd );
}
private void deleteSimpleDoc(String id) throws Exception {
SolrCore core = h.getCore();
UpdateHandler updater = core.getUpdateHandler();
// Delete the document
DeleteUpdateCommand cmd = new DeleteUpdateCommand();
cmd.id = id;
cmd.fromCommitted = true;
cmd.fromPending = true;
updater.delete(cmd);
}
} }