SOLR-2748: CommitTracker fixes

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1166866 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Yonik Seeley 2011-09-08 19:27:40 +00:00
parent 9cae51e472
commit 013e2776b8
3 changed files with 116 additions and 66 deletions

View File

@ -321,6 +321,13 @@ Documentation
================== 3.5.0 ================== ================== 3.5.0 ==================
Bug Fixes
----------------------
* SOLR-2748: The CommitTracker used for commitWith or autoCommit by maxTime
could commit too frequently and could block adds until a new seaercher was
registered. (yonik)
Other Changes Other Changes
---------------------- ----------------------
@ -328,6 +335,8 @@ Documentation
ints masquerading as booleans. Preferred constructor now accepts a single int ints masquerading as booleans. Preferred constructor now accepts a single int
bitfield (Chris Male) bitfield (Chris Male)
================== 3.4.0 ================== ================== 3.4.0 ==================
Upgrading from Solr 3.3 Upgrading from Solr 3.3

View File

@ -24,6 +24,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.params.ModifiableSolrParams; import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.core.SolrCore; import org.apache.solr.core.SolrCore;
import org.apache.solr.request.LocalSolrQueryRequest; import org.apache.solr.request.LocalSolrQueryRequest;
@ -43,7 +44,7 @@ final class CommitTracker implements Runnable {
protected final static Logger log = LoggerFactory.getLogger(CommitTracker.class); protected final static Logger log = LoggerFactory.getLogger(CommitTracker.class);
// scheduler delay for maxDoc-triggered autocommits // scheduler delay for maxDoc-triggered autocommits
public final int DOC_COMMIT_DELAY_MS = 250; public final int DOC_COMMIT_DELAY_MS = 1;
// settings, not final so we can change them in testing // settings, not final so we can change them in testing
private int docsUpperBound; private int docsUpperBound;
@ -56,7 +57,6 @@ final class CommitTracker implements Runnable {
// state // state
private AtomicLong docsSinceCommit = new AtomicLong(0); private AtomicLong docsSinceCommit = new AtomicLong(0);
private AtomicInteger autoCommitCount = new AtomicInteger(0); private AtomicInteger autoCommitCount = new AtomicInteger(0);
private volatile long lastAddedTime = -1;
private final SolrCore core; private final SolrCore core;
@ -88,20 +88,38 @@ final class CommitTracker implements Runnable {
} }
/** schedule individual commits */ /** schedule individual commits */
public synchronized void scheduleCommitWithin(long commitMaxTime) { public void scheduleCommitWithin(long commitMaxTime) {
_scheduleCommitWithin(commitMaxTime); _scheduleCommitWithin(commitMaxTime);
} }
private synchronized void _scheduleCommitWithin(long commitMaxTime) { private void _scheduleCommitWithin(long commitMaxTime) {
// Check if there is a commit already scheduled for longer then this time if (commitMaxTime <= 0) return;
if (pending != null synchronized (this) {
&& pending.getDelay(TimeUnit.MILLISECONDS) >= commitMaxTime) { if (pending != null && pending.getDelay(TimeUnit.MILLISECONDS) <= commitMaxTime) {
pending.cancel(false); // There is already a pending commit that will happen first, so
pending = null; // nothing else to do here.
// log.info("###returning since getDelay()==" + pending.getDelay(TimeUnit.MILLISECONDS) + " less than " + commitMaxTime);
return;
} }
// schedule a new commit if (pending != null) {
if (pending == null) { // we need to schedule a commit to happen sooner than the existing one,
// so lets try to cancel the existing one first.
boolean canceled = pending.cancel(false);
if (!canceled) {
// It looks like we can't cancel... it must have just started running!
// this is possible due to thread scheduling delays and a low commitMaxTime.
// Nothing else to do since we obviously can't schedule our commit *before*
// the one that just started running (or has just completed).
// log.info("###returning since cancel failed");
return;
}
}
// log.info("###scheduling for " + commitMaxTime);
// schedule our new commit
pending = scheduler.schedule(this, commitMaxTime, TimeUnit.MILLISECONDS); pending = scheduler.schedule(this, commitMaxTime, TimeUnit.MILLISECONDS);
} }
} }
@ -109,14 +127,15 @@ final class CommitTracker implements Runnable {
/** /**
* Indicate that documents have been added * Indicate that documents have been added
*/ */
public boolean addedDocument(int commitWithin) { public void addedDocument(int commitWithin) {
docsSinceCommit.incrementAndGet(); // maxDocs-triggered autoCommit. Use == instead of > so we only trigger once on the way up
lastAddedTime = System.currentTimeMillis(); if (docsUpperBound > 0) {
boolean triggered = false; long docs = docsSinceCommit.incrementAndGet();
// maxDocs-triggered autoCommit if (docs == docsUpperBound + 1) {
if (docsUpperBound > 0 && (docsSinceCommit.get() > docsUpperBound)) { // reset the count here instead of run() so we don't miss other documents being added
docsSinceCommit.set(0);
_scheduleCommitWithin(DOC_COMMIT_DELAY_MS); _scheduleCommitWithin(DOC_COMMIT_DELAY_MS);
triggered = true; }
} }
// maxTime-triggered autoCommit // maxTime-triggered autoCommit
@ -124,33 +143,31 @@ final class CommitTracker implements Runnable {
if (ctime > 0) { if (ctime > 0) {
_scheduleCommitWithin(ctime); _scheduleCommitWithin(ctime);
triggered = true; }
} }
return triggered; /** Inform tracker that a commit has occurred */
}
/** Inform tracker that a commit has occurred, cancel any pending commits */
public void didCommit() { public void didCommit() {
if (pending != null) {
pending.cancel(false);
pending = null; // let it start another one
}
docsSinceCommit.set(0);
} }
/** Inform tracker that a rollback has occurred, cancel any pending commits */ /** Inform tracker that a rollback has occurred, cancel any pending commits */
public void didRollback() { public void didRollback() {
synchronized (this) {
if (pending != null) { if (pending != null) {
pending.cancel(false); pending.cancel(false);
pending = null; // let it start another one pending = null; // let it start another one
} }
docsSinceCommit.set(0); docsSinceCommit.set(0);
} }
}
/** This is the worker part for the ScheduledFuture **/ /** This is the worker part for the ScheduledFuture **/
public synchronized void run() { public void run() {
long started = System.currentTimeMillis(); synchronized (this) {
// log.info("###start commit. pending=null");
pending = null; // allow a new commit to be scheduled
}
SolrQueryRequest req = new LocalSolrQueryRequest(core, SolrQueryRequest req = new LocalSolrQueryRequest(core,
new ModifiableSolrParams()); new ModifiableSolrParams());
try { try {
@ -158,24 +175,18 @@ final class CommitTracker implements Runnable {
command.waitSearcher = waitSearcher; command.waitSearcher = waitSearcher;
command.softCommit = softCommit; command.softCommit = softCommit;
// no need for command.maxOptimizeSegments = 1; since it is not optimizing // no need for command.maxOptimizeSegments = 1; since it is not optimizing
core.getUpdateHandler().commit(command);
autoCommitCount.incrementAndGet();
} catch (Exception e) {
log.error("auto commit error...");
e.printStackTrace();
} finally {
pending = null;
req.close();
}
// check if docs have been submitted since the commit started // we increment this *before* calling commit because it was causing a race
if (lastAddedTime > started) { // in the tests (the new searcher was registered and the test proceeded
if (docsUpperBound > 0 && docsSinceCommit.get() > docsUpperBound) { // to check the commit count before we had incremented it.)
pending = scheduler.schedule(this, 100, TimeUnit.MILLISECONDS); autoCommitCount.incrementAndGet();
} else if (timeUpperBound > 0) {
pending = scheduler.schedule(this, timeUpperBound, core.getUpdateHandler().commit(command);
TimeUnit.MILLISECONDS); } catch (Exception e) {
} SolrException.log(log, "auto commit error...", e);
} finally {
// log.info("###done committing");
req.close();
} }
} }

View File

@ -33,6 +33,7 @@ import org.apache.solr.response.SolrQueryResponse;
import org.apache.solr.search.SolrIndexSearcher; import org.apache.solr.search.SolrIndexSearcher;
import org.apache.solr.update.NewSearcherListener.TriggerOn; import org.apache.solr.update.NewSearcherListener.TriggerOn;
import org.apache.solr.util.AbstractSolrTestCase; import org.apache.solr.util.AbstractSolrTestCase;
import org.apache.solr.util.RefCounted;
class NewSearcherListener implements SolrEventListener { class NewSearcherListener implements SolrEventListener {
@ -41,6 +42,7 @@ class NewSearcherListener implements SolrEventListener {
private volatile boolean triggered = false; private volatile boolean triggered = false;
private volatile TriggerOn lastType; private volatile TriggerOn lastType;
private volatile TriggerOn triggerOnType; private volatile TriggerOn triggerOnType;
private volatile SolrIndexSearcher newSearcher;
public NewSearcherListener() { public NewSearcherListener() {
this(TriggerOn.Both); this(TriggerOn.Both);
@ -63,6 +65,8 @@ class NewSearcherListener implements SolrEventListener {
} else if (triggerOnType == TriggerOn.Both) { } else if (triggerOnType == TriggerOn.Both) {
triggered = true; triggered = true;
} }
this.newSearcher = newSearcher;
// log.info("TEST: newSearcher event: triggered="+triggered+" newSearcher="+newSearcher);
} }
@Override @Override
@ -77,13 +81,19 @@ class NewSearcherListener implements SolrEventListener {
public void reset() { public void reset() {
triggered = false; triggered = false;
// log.info("TEST: trigger reset");
} }
boolean waitForNewSearcher(int timeout) { boolean waitForNewSearcher(int timeout) {
long timeoutTime = System.currentTimeMillis() + timeout; long timeoutTime = System.currentTimeMillis() + timeout;
while (System.currentTimeMillis() < timeoutTime) { while (System.currentTimeMillis() < timeoutTime) {
if (triggered) { if (triggered) {
return true; // check if the new searcher has been registered yet
RefCounted<SolrIndexSearcher> registeredSearcherH = newSearcher.getCore().getSearcher();
SolrIndexSearcher registeredSearcher = registeredSearcherH.get();
registeredSearcherH.decref();
if (registeredSearcher == newSearcher) return true;
// log.info("TEST: waiting for searcher " + newSearcher + " to be registered. current=" + registeredSearcher);
} }
try { try {
@ -102,6 +112,19 @@ public class AutoCommitTest extends AbstractSolrTestCase {
@Override @Override
public String getSolrConfigFile() { return "solrconfig.xml"; } public String getSolrConfigFile() { return "solrconfig.xml"; }
public static void verbose(Object... args) {
if (!VERBOSE) return;
StringBuilder sb = new StringBuilder("###TEST:");
sb.append(Thread.currentThread().getName());
sb.append(':');
for (Object o : args) {
sb.append(' ');
sb.append(o.toString());
}
log.info(sb.toString());
// System.out.println(sb.toString());
}
/** /**
* Take a string and make it an iterable ContentStream * Take a string and make it an iterable ContentStream
* *
@ -345,6 +368,7 @@ public class AutoCommitTest extends AbstractSolrTestCase {
// Check it it is in the index // Check it it is in the index
assertQ("shouldn't find any", req("id:529") ,"//result[@numFound=0]" ); assertQ("shouldn't find any", req("id:529") ,"//result[@numFound=0]" );
assertEquals(0, softTracker.getCommitCount());
// Wait longer than the autocommit time // Wait longer than the autocommit time
assertTrue(trigger.waitForNewSearcher(30000)); assertTrue(trigger.waitForNewSearcher(30000));
@ -357,16 +381,22 @@ public class AutoCommitTest extends AbstractSolrTestCase {
assertQ("should find one", req("id:529") ,"//result[@numFound=1]" ); assertQ("should find one", req("id:529") ,"//result[@numFound=1]" );
// But not this one // But not this one
assertQ("should find none", req("id:530") ,"//result[@numFound=0]" ); assertQ("should find none", req("id:530") ,"//result[@numFound=0]" );
verbose("###about to delete 529");
// Delete the document // Delete the document
assertU( delI("529") ); assertU(delI("529"));
assertQ("deleted, but should still be there", req("id:529") ,"//result[@numFound=1]" ); assertQ("deleted, but should still be there", req("id:529") ,"//result[@numFound=1]" );
// Wait longer than the autocommit time // Wait longer than the autocommit time
verbose("###starting to wait for new searcher. softTracker.getCommitCount()==",softTracker.getCommitCount());
assertTrue(trigger.waitForNewSearcher(15000)); assertTrue(trigger.waitForNewSearcher(15000));
trigger.reset(); trigger.reset();
verbose("###done waiting for new searcher. softTracker.getCommitCount()==",softTracker.getCommitCount());
// what's the point of this update?
req.setContentStreams( toContentStreams( req.setContentStreams( toContentStreams(
adoc("id", "550", "field_t", "what's inside?", "subject", "info"), null ) ); adoc("id", "550", "field_t", "what's inside?", "subject", "info"), null ) );
handler.handleRequest( req, rsp ); handler.handleRequest( req, rsp );
assertEquals( 2, softTracker.getCommitCount() ); assertEquals( 2, softTracker.getCommitCount() );
assertQ("deleted and time has passed", req("id:529") ,"//result[@numFound=0]" ); assertQ("deleted and time has passed", req("id:529") ,"//result[@numFound=0]" );