SOLR-2565: Fix bug - soft commit should not try and respect commitWithin. Also, since it drives me nuts, make CommitTracker more thread safe than it was.

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1159472 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Mark Robert Miller 2011-08-19 01:48:31 +00:00
parent 236fb6d96c
commit a8949dce07
3 changed files with 155 additions and 61 deletions

View File

@ -21,6 +21,8 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.solr.common.params.ModifiableSolrParams; import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.core.SolrCore; import org.apache.solr.core.SolrCore;
@ -44,29 +46,28 @@ final class CommitTracker implements Runnable {
public final int DOC_COMMIT_DELAY_MS = 250; public final int DOC_COMMIT_DELAY_MS = 250;
// settings, not final so we can change them in testing // settings, not final so we can change them in testing
int docsUpperBound; private int docsUpperBound;
long timeUpperBound; private long timeUpperBound;
private final ScheduledExecutorService scheduler = Executors private final ScheduledExecutorService scheduler = Executors
.newScheduledThreadPool(1); .newScheduledThreadPool(1);
private ScheduledFuture pending; private ScheduledFuture pending;
// state // state
long docsSinceCommit; private AtomicLong docsSinceCommit = new AtomicLong(0);
int autoCommitCount = 0; private AtomicInteger autoCommitCount = new AtomicInteger(0);
long lastAddedTime = -1; private volatile long lastAddedTime = -1;
private SolrCore core; private final SolrCore core;
private boolean softCommit; private final boolean softCommit;
private boolean waitSearcher; private final boolean waitSearcher;
private String name; private String name;
public CommitTracker(String name, SolrCore core, int docsUpperBound, int timeUpperBound, boolean waitSearcher, boolean softCommit) { public CommitTracker(String name, SolrCore core, int docsUpperBound, int timeUpperBound, boolean waitSearcher, boolean softCommit) {
this.core = core; this.core = core;
this.name = name; this.name = name;
docsSinceCommit = 0;
pending = null; pending = null;
this.docsUpperBound = docsUpperBound; this.docsUpperBound = docsUpperBound;
@ -78,7 +79,7 @@ final class CommitTracker implements Runnable {
SolrCore.log.info(name + " AutoCommit: " + this); SolrCore.log.info(name + " AutoCommit: " + this);
} }
public void close() { public synchronized void close() {
if (pending != null) { if (pending != null) {
pending.cancel(true); pending.cancel(true);
pending = null; pending = null;
@ -91,7 +92,7 @@ final class CommitTracker implements Runnable {
_scheduleCommitWithin(commitMaxTime); _scheduleCommitWithin(commitMaxTime);
} }
private void _scheduleCommitWithin(long commitMaxTime) { private synchronized void _scheduleCommitWithin(long commitMaxTime) {
// Check if there is a commit already scheduled for longer then this time // Check if there is a commit already scheduled for longer then this time
if (pending != null if (pending != null
&& pending.getDelay(TimeUnit.MILLISECONDS) >= commitMaxTime) { && pending.getDelay(TimeUnit.MILLISECONDS) >= commitMaxTime) {
@ -109,17 +110,18 @@ final class CommitTracker implements Runnable {
* Indicate that documents have been added * Indicate that documents have been added
*/ */
public boolean addedDocument(int commitWithin) { public boolean addedDocument(int commitWithin) {
docsSinceCommit++; docsSinceCommit.incrementAndGet();
lastAddedTime = System.currentTimeMillis(); lastAddedTime = System.currentTimeMillis();
boolean triggered = false; boolean triggered = false;
// maxDocs-triggered autoCommit // maxDocs-triggered autoCommit
if (docsUpperBound > 0 && (docsSinceCommit > docsUpperBound)) { if (docsUpperBound > 0 && (docsSinceCommit.get() > docsUpperBound)) {
_scheduleCommitWithin(DOC_COMMIT_DELAY_MS); _scheduleCommitWithin(DOC_COMMIT_DELAY_MS);
triggered = true; triggered = true;
} }
// maxTime-triggered autoCommit // maxTime-triggered autoCommit
long ctime = (commitWithin > 0) ? commitWithin : timeUpperBound; long ctime = (commitWithin > 0) ? commitWithin : timeUpperBound;
if (ctime > 0) { if (ctime > 0) {
_scheduleCommitWithin(ctime); _scheduleCommitWithin(ctime);
triggered = true; triggered = true;
@ -134,7 +136,7 @@ final class CommitTracker implements Runnable {
pending.cancel(false); pending.cancel(false);
pending = null; // let it start another one pending = null; // let it start another one
} }
docsSinceCommit = 0; docsSinceCommit.set(0);
} }
/** Inform tracker that a rollback has occurred, cancel any pending commits */ /** Inform tracker that a rollback has occurred, cancel any pending commits */
@ -143,7 +145,7 @@ final class CommitTracker implements Runnable {
pending.cancel(false); pending.cancel(false);
pending = null; // let it start another one pending = null; // let it start another one
} }
docsSinceCommit = 0; docsSinceCommit.set(0);
} }
/** This is the worker part for the ScheduledFuture **/ /** This is the worker part for the ScheduledFuture **/
@ -157,7 +159,7 @@ final class CommitTracker implements Runnable {
command.softCommit = softCommit; command.softCommit = softCommit;
// no need for command.maxOptimizeSegments = 1; since it is not optimizing // no need for command.maxOptimizeSegments = 1; since it is not optimizing
core.getUpdateHandler().commit(command); core.getUpdateHandler().commit(command);
autoCommitCount++; autoCommitCount.incrementAndGet();
} catch (Exception e) { } catch (Exception e) {
log.error("auto commit error..."); log.error("auto commit error...");
e.printStackTrace(); e.printStackTrace();
@ -168,7 +170,7 @@ final class CommitTracker implements Runnable {
// check if docs have been submitted since the commit started // check if docs have been submitted since the commit started
if (lastAddedTime > started) { if (lastAddedTime > started) {
if (docsUpperBound > 0 && docsSinceCommit > docsUpperBound) { if (docsUpperBound > 0 && docsSinceCommit.get() > docsUpperBound) {
pending = scheduler.schedule(this, 100, TimeUnit.MILLISECONDS); pending = scheduler.schedule(this, 100, TimeUnit.MILLISECONDS);
} else if (timeUpperBound > 0) { } else if (timeUpperBound > 0) {
pending = scheduler.schedule(this, timeUpperBound, pending = scheduler.schedule(this, timeUpperBound,
@ -178,8 +180,8 @@ final class CommitTracker implements Runnable {
} }
// to facilitate testing: blocks if called during commit // to facilitate testing: blocks if called during commit
public synchronized int getCommitCount() { public int getCommitCount() {
return autoCommitCount; return autoCommitCount.get();
} }
@Override @Override
@ -194,4 +196,20 @@ final class CommitTracker implements Runnable {
return "disabled"; return "disabled";
} }
} }
public long getTimeUpperBound() {
return timeUpperBound;
}
int getDocsUpperBound() {
return docsUpperBound;
}
void setDocsUpperBound(int docsUpperBound) {
this.docsUpperBound = docsUpperBound;
}
void setTimeUpperBound(long timeUpperBound) {
this.timeUpperBound = timeUpperBound;
}
} }

View File

@ -137,15 +137,8 @@ public class DirectUpdateHandler2 extends UpdateHandler {
try { try {
boolean triggered = commitTracker.addedDocument( cmd.commitWithin ); commitTracker.addedDocument( cmd.commitWithin );
softCommitTracker.addedDocument( -1 ); // TODO: support commitWithin with soft update
if (!triggered) {
// if we hard commit, don't soft commit
softCommitTracker.addedDocument( cmd.commitWithin );
} else {
// still inc softCommit
softCommitTracker.docsSinceCommit++;
}
if (cmd.overwrite) { if (cmd.overwrite) {
Term updateTerm; Term updateTerm;
@ -192,10 +185,10 @@ public class DirectUpdateHandler2 extends UpdateHandler {
indexWriterProvider.getIndexWriter(core).deleteDocuments(new Term(idField.getName(), cmd.getIndexedId())); indexWriterProvider.getIndexWriter(core).deleteDocuments(new Term(idField.getName(), cmd.getIndexedId()));
if (commitTracker.timeUpperBound > 0) { if (commitTracker.getTimeUpperBound() > 0) {
commitTracker.scheduleCommitWithin(commitTracker.timeUpperBound); commitTracker.scheduleCommitWithin(commitTracker.getTimeUpperBound());
} else if (softCommitTracker.timeUpperBound > 0) { } else if (softCommitTracker.getTimeUpperBound() > 0) {
softCommitTracker.scheduleCommitWithin(softCommitTracker.timeUpperBound); softCommitTracker.scheduleCommitWithin(softCommitTracker.getTimeUpperBound());
} }
} }
@ -224,10 +217,10 @@ public class DirectUpdateHandler2 extends UpdateHandler {
madeIt = true; madeIt = true;
if (commitTracker.timeUpperBound > 0) { if (commitTracker.getTimeUpperBound() > 0) {
commitTracker.scheduleCommitWithin(commitTracker.timeUpperBound); commitTracker.scheduleCommitWithin(commitTracker.getTimeUpperBound());
} else if (softCommitTracker.timeUpperBound > 0) { } else if (softCommitTracker.getTimeUpperBound()> 0) {
softCommitTracker.scheduleCommitWithin(softCommitTracker.timeUpperBound); softCommitTracker.scheduleCommitWithin(softCommitTracker.getTimeUpperBound());
} }
} finally { } finally {
@ -255,10 +248,10 @@ public class DirectUpdateHandler2 extends UpdateHandler {
log.info("end_mergeIndexes"); log.info("end_mergeIndexes");
// TODO: consider soft commit issues // TODO: consider soft commit issues
if (rc == 1 && commitTracker.timeUpperBound > 0) { if (rc == 1 && commitTracker.getTimeUpperBound() > 0) {
commitTracker.scheduleCommitWithin(commitTracker.timeUpperBound); commitTracker.scheduleCommitWithin(commitTracker.getTimeUpperBound());
} else if (rc == 1 && softCommitTracker.timeUpperBound > 0) { } else if (rc == 1 && softCommitTracker.getTimeUpperBound() > 0) {
softCommitTracker.scheduleCommitWithin(softCommitTracker.timeUpperBound); softCommitTracker.scheduleCommitWithin(softCommitTracker.getTimeUpperBound());
} }
return rc; return rc;
@ -450,20 +443,20 @@ public class DirectUpdateHandler2 extends UpdateHandler {
public NamedList getStatistics() { public NamedList getStatistics() {
NamedList lst = new SimpleOrderedMap(); NamedList lst = new SimpleOrderedMap();
lst.add("commits", commitCommands.get()); lst.add("commits", commitCommands.get());
if (commitTracker.docsUpperBound > 0) { if (commitTracker.getTimeUpperBound() > 0) {
lst.add("autocommit maxDocs", commitTracker.docsUpperBound); lst.add("autocommit maxDocs", commitTracker.getTimeUpperBound());
} }
if (commitTracker.timeUpperBound > 0) { if (commitTracker.getTimeUpperBound() > 0) {
lst.add("autocommit maxTime", "" + commitTracker.timeUpperBound + "ms"); lst.add("autocommit maxTime", "" + commitTracker.getTimeUpperBound() + "ms");
} }
lst.add("autocommits", commitTracker.autoCommitCount); lst.add("autocommits", commitTracker.getCommitCount());
if (softCommitTracker.docsUpperBound > 0) { if (softCommitTracker.getTimeUpperBound() > 0) {
lst.add("soft autocommit maxDocs", softCommitTracker.docsUpperBound); lst.add("soft autocommit maxDocs", softCommitTracker.getTimeUpperBound());
} }
if (softCommitTracker.timeUpperBound > 0) { if (softCommitTracker.getTimeUpperBound() > 0) {
lst.add("soft autocommit maxTime", "" + softCommitTracker.timeUpperBound + "ms"); lst.add("soft autocommit maxTime", "" + softCommitTracker.getTimeUpperBound() + "ms");
} }
lst.add("soft autocommits", softCommitTracker.autoCommitCount); lst.add("soft autocommits", softCommitTracker.getCommitCount());
lst.add("optimizes", optimizeCommands.get()); lst.add("optimizes", optimizeCommands.get());
lst.add("rollbacks", rollbackCommands.get()); lst.add("rollbacks", rollbackCommands.get());
lst.add("expungeDeletes", expungeDeleteCommands.get()); lst.add("expungeDeletes", expungeDeleteCommands.get());

View File

@ -99,8 +99,8 @@ public class AutoCommitTest extends AbstractSolrTestCase {
DirectUpdateHandler2 updateHandler = (DirectUpdateHandler2)core.getUpdateHandler(); DirectUpdateHandler2 updateHandler = (DirectUpdateHandler2)core.getUpdateHandler();
CommitTracker tracker = updateHandler.commitTracker; CommitTracker tracker = updateHandler.commitTracker;
tracker.timeUpperBound = -1; tracker.setTimeUpperBound(-1);
tracker.docsUpperBound = 14; tracker.setDocsUpperBound(14);
core.registerNewSearcherListener(trigger); core.registerNewSearcherListener(trigger);
@ -148,8 +148,8 @@ public class AutoCommitTest extends AbstractSolrTestCase {
CommitTracker tracker = updater.commitTracker; CommitTracker tracker = updater.commitTracker;
// too low of a number can cause a slow host to commit before the test code checks that it // too low of a number can cause a slow host to commit before the test code checks that it
// isn't there... causing a failure at "shouldn't find any" // isn't there... causing a failure at "shouldn't find any"
tracker.timeUpperBound = 1000; tracker.setTimeUpperBound(1000);
tracker.docsUpperBound = -1; tracker.setDocsUpperBound(-1);
// updater.commitCallbacks.add(trigger); // updater.commitCallbacks.add(trigger);
XmlUpdateRequestHandler handler = new XmlUpdateRequestHandler(); XmlUpdateRequestHandler handler = new XmlUpdateRequestHandler();
@ -221,15 +221,15 @@ public class AutoCommitTest extends AbstractSolrTestCase {
DirectUpdateHandler2 updateHandler = (DirectUpdateHandler2)core.getUpdateHandler(); DirectUpdateHandler2 updateHandler = (DirectUpdateHandler2)core.getUpdateHandler();
CommitTracker tracker = updateHandler.commitTracker; CommitTracker tracker = updateHandler.commitTracker;
tracker.timeUpperBound = -1; tracker.setTimeUpperBound(-1);
tracker.docsUpperBound = 8; tracker.setDocsUpperBound(8);
NewSearcherListener softTrigger = new NewSearcherListener(); NewSearcherListener softTrigger = new NewSearcherListener();
CommitTracker softTracker = updateHandler.softCommitTracker; CommitTracker softTracker = updateHandler.softCommitTracker;
softTracker.timeUpperBound = -1; softTracker.setTimeUpperBound(-1);
softTracker.docsUpperBound = 4; softTracker.setDocsUpperBound(4);
core.registerNewSearcherListener(softTrigger); core.registerNewSearcherListener(softTrigger);
XmlUpdateRequestHandler handler = new XmlUpdateRequestHandler(); XmlUpdateRequestHandler handler = new XmlUpdateRequestHandler();
@ -282,7 +282,7 @@ public class AutoCommitTest extends AbstractSolrTestCase {
assertTrue(trigger.waitForNewSearcher(10000)); assertTrue(trigger.waitForNewSearcher(10000));
assertQ("should find 10", req("*:*") ,"//result[@numFound=10]" ); assertQ("should find 10", req("*:*") ,"//result[@numFound=10]" );
assertEquals( 1, softTracker.getCommitCount()); assertEquals( 2, softTracker.getCommitCount());
assertEquals( 1, tracker.getCommitCount()); assertEquals( 1, tracker.getCommitCount());
} }
@ -296,8 +296,8 @@ public class AutoCommitTest extends AbstractSolrTestCase {
// too low of a number can cause a slow host to commit before the test code checks that it // too low of a number can cause a slow host to commit before the test code checks that it
// isn't there... causing a failure at "shouldn't find any" // isn't there... causing a failure at "shouldn't find any"
softTracker.timeUpperBound = 2000; softTracker.setTimeUpperBound(2000);
softTracker.docsUpperBound = -1; softTracker.setDocsUpperBound(-1);
// updater.commitCallbacks.add(trigger); // updater.commitCallbacks.add(trigger);
XmlUpdateRequestHandler handler = new XmlUpdateRequestHandler(); XmlUpdateRequestHandler handler = new XmlUpdateRequestHandler();
@ -362,4 +362,87 @@ public class AutoCommitTest extends AbstractSolrTestCase {
assertQ("now it should", req("id:500") ,"//result[@numFound=1]" ); assertQ("now it should", req("id:500") ,"//result[@numFound=1]" );
assertQ("but not this", req("id:531") ,"//result[@numFound=0]" ); assertQ("but not this", req("id:531") ,"//result[@numFound=0]" );
} }
public void testSoftAndHardCommitMaxTime() throws Exception {
SolrCore core = h.getCore();
NewSearcherListener trigger = new NewSearcherListener();
core.registerNewSearcherListener(trigger);
DirectUpdateHandler2 updater = (DirectUpdateHandler2) core.getUpdateHandler();
CommitTracker hardTracker = updater.commitTracker;
CommitTracker softTracker = updater.softCommitTracker;
// too low of a number can cause a slow host to commit before the test code checks that it
// isn't there... causing a failure at "shouldn't find any"
softTracker.setTimeUpperBound(200);
softTracker.setDocsUpperBound(-1);
hardTracker.setTimeUpperBound(1000);
hardTracker.setDocsUpperBound(-1);
// updater.commitCallbacks.add(trigger);
XmlUpdateRequestHandler handler = new XmlUpdateRequestHandler();
handler.init( null );
MapSolrParams params = new MapSolrParams( new HashMap<String, String>() );
// Add a single document
SolrQueryResponse rsp = new SolrQueryResponse();
SolrQueryRequestBase req = new SolrQueryRequestBase( core, params ) {};
req.setContentStreams( toContentStreams(
adoc("id", "529", "field_t", "what's inside?", "subject", "info"), null ) );
trigger.reset();
handler.handleRequest( req, rsp );
// Check it it is in the index
assertQ("shouldn't find any", req("id:529") ,"//result[@numFound=0]" );
// Wait longer than the autocommit time
assertTrue(trigger.waitForNewSearcher(30000));
trigger.reset();
req.setContentStreams( toContentStreams(
adoc("id", "530", "field_t", "what's inside?", "subject", "info"), null ) );
handler.handleRequest( req, rsp );
// Now make sure we can find it
assertQ("should find one", req("id:529") ,"//result[@numFound=1]" );
// But not this one
assertQ("should find none", req("id:530") ,"//result[@numFound=0]" );
// Delete the document
assertU( delI("529") );
assertQ("deleted, but should still be there", req("id:529") ,"//result[@numFound=1]" );
// Wait longer than the autocommit time
assertTrue(trigger.waitForNewSearcher(15000));
trigger.reset();
req.setContentStreams( toContentStreams(
adoc("id", "550", "field_t", "what's inside?", "subject", "info"), null ) );
handler.handleRequest( req, rsp );
assertEquals( 2, softTracker.getCommitCount() );
assertQ("deleted and time has passed", req("id:529") ,"//result[@numFound=0]" );
// now make the call 5 times really fast and make sure it
// only commits once
req.setContentStreams( toContentStreams(
adoc("id", "500" ), null ) );
for( int i=0;i<5; i++ ) {
handler.handleRequest( req, rsp );
}
assertQ("should not be there yet", req("id:500") ,"//result[@numFound=0]" );
// Wait longer than the autocommit time
assertTrue(trigger.waitForNewSearcher(15000));
trigger.reset();
req.setContentStreams( toContentStreams(
adoc("id", "531", "field_t", "what's inside?", "subject", "info"), null ) );
handler.handleRequest( req, rsp );
// depending on timing, you might see 2 or 3 soft commits
int softCommitCnt = softTracker.getCommitCount();
assertTrue("commit cnt:" + softCommitCnt, softCommitCnt == 2
|| softCommitCnt == 3);
assertEquals(1, hardTracker.getCommitCount());
assertQ("now it should", req("id:500") ,"//result[@numFound=1]" );
assertQ("but not this", req("id:531") ,"//result[@numFound=0]" );
}
} }