diff --git a/solr/core/src/java/org/apache/solr/update/CommitTracker.java b/solr/core/src/java/org/apache/solr/update/CommitTracker.java index d04ea465855..456c56eb230 100644 --- a/solr/core/src/java/org/apache/solr/update/CommitTracker.java +++ b/solr/core/src/java/org/apache/solr/update/CommitTracker.java @@ -21,6 +21,8 @@ import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import org.apache.solr.common.params.ModifiableSolrParams; import org.apache.solr.core.SolrCore; @@ -44,29 +46,28 @@ final class CommitTracker implements Runnable { public final int DOC_COMMIT_DELAY_MS = 250; // settings, not final so we can change them in testing - int docsUpperBound; - long timeUpperBound; + private int docsUpperBound; + private long timeUpperBound; private final ScheduledExecutorService scheduler = Executors .newScheduledThreadPool(1); private ScheduledFuture pending; // state - long docsSinceCommit; - int autoCommitCount = 0; - long lastAddedTime = -1; + private AtomicLong docsSinceCommit = new AtomicLong(0); + private AtomicInteger autoCommitCount = new AtomicInteger(0); + private volatile long lastAddedTime = -1; - private SolrCore core; + private final SolrCore core; - private boolean softCommit; - private boolean waitSearcher; + private final boolean softCommit; + private final boolean waitSearcher; private String name; public CommitTracker(String name, SolrCore core, int docsUpperBound, int timeUpperBound, boolean waitSearcher, boolean softCommit) { this.core = core; this.name = name; - docsSinceCommit = 0; pending = null; this.docsUpperBound = docsUpperBound; @@ -78,7 +79,7 @@ final class CommitTracker implements Runnable { SolrCore.log.info(name + " AutoCommit: " + this); } - public void close() { + public synchronized void close() { if (pending != null) { pending.cancel(true); pending = null; @@ -91,7 +92,7 @@ final class CommitTracker implements Runnable { _scheduleCommitWithin(commitMaxTime); } - private void _scheduleCommitWithin(long commitMaxTime) { + private synchronized void _scheduleCommitWithin(long commitMaxTime) { // Check if there is a commit already scheduled for longer then this time if (pending != null && pending.getDelay(TimeUnit.MILLISECONDS) >= commitMaxTime) { @@ -109,17 +110,18 @@ final class CommitTracker implements Runnable { * Indicate that documents have been added */ public boolean addedDocument(int commitWithin) { - docsSinceCommit++; + docsSinceCommit.incrementAndGet(); lastAddedTime = System.currentTimeMillis(); boolean triggered = false; // maxDocs-triggered autoCommit - if (docsUpperBound > 0 && (docsSinceCommit > docsUpperBound)) { + if (docsUpperBound > 0 && (docsSinceCommit.get() > docsUpperBound)) { _scheduleCommitWithin(DOC_COMMIT_DELAY_MS); triggered = true; } // maxTime-triggered autoCommit long ctime = (commitWithin > 0) ? commitWithin : timeUpperBound; + if (ctime > 0) { _scheduleCommitWithin(ctime); triggered = true; @@ -134,7 +136,7 @@ final class CommitTracker implements Runnable { pending.cancel(false); pending = null; // let it start another one } - docsSinceCommit = 0; + docsSinceCommit.set(0); } /** Inform tracker that a rollback has occurred, cancel any pending commits */ @@ -143,7 +145,7 @@ final class CommitTracker implements Runnable { pending.cancel(false); pending = null; // let it start another one } - docsSinceCommit = 0; + docsSinceCommit.set(0); } /** This is the worker part for the ScheduledFuture **/ @@ -157,7 +159,7 @@ final class CommitTracker implements Runnable { command.softCommit = softCommit; // no need for command.maxOptimizeSegments = 1; since it is not optimizing core.getUpdateHandler().commit(command); - autoCommitCount++; + autoCommitCount.incrementAndGet(); } catch (Exception e) { log.error("auto commit error..."); e.printStackTrace(); @@ -168,7 +170,7 @@ final class CommitTracker implements Runnable { // check if docs have been submitted since the commit started if (lastAddedTime > started) { - if (docsUpperBound > 0 && docsSinceCommit > docsUpperBound) { + if (docsUpperBound > 0 && docsSinceCommit.get() > docsUpperBound) { pending = scheduler.schedule(this, 100, TimeUnit.MILLISECONDS); } else if (timeUpperBound > 0) { pending = scheduler.schedule(this, timeUpperBound, @@ -178,8 +180,8 @@ final class CommitTracker implements Runnable { } // to facilitate testing: blocks if called during commit - public synchronized int getCommitCount() { - return autoCommitCount; + public int getCommitCount() { + return autoCommitCount.get(); } @Override @@ -194,4 +196,20 @@ final class CommitTracker implements Runnable { return "disabled"; } } + + public long getTimeUpperBound() { + return timeUpperBound; + } + + int getDocsUpperBound() { + return docsUpperBound; + } + + void setDocsUpperBound(int docsUpperBound) { + this.docsUpperBound = docsUpperBound; + } + + void setTimeUpperBound(long timeUpperBound) { + this.timeUpperBound = timeUpperBound; + } } diff --git a/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java b/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java index e5f87271043..a37acfa3b69 100644 --- a/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java +++ b/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java @@ -137,15 +137,8 @@ public class DirectUpdateHandler2 extends UpdateHandler { try { - boolean triggered = commitTracker.addedDocument( cmd.commitWithin ); - - if (!triggered) { - // if we hard commit, don't soft commit - softCommitTracker.addedDocument( cmd.commitWithin ); - } else { - // still inc softCommit - softCommitTracker.docsSinceCommit++; - } + commitTracker.addedDocument( cmd.commitWithin ); + softCommitTracker.addedDocument( -1 ); // TODO: support commitWithin with soft update if (cmd.overwrite) { Term updateTerm; @@ -192,10 +185,10 @@ public class DirectUpdateHandler2 extends UpdateHandler { indexWriterProvider.getIndexWriter(core).deleteDocuments(new Term(idField.getName(), cmd.getIndexedId())); - if (commitTracker.timeUpperBound > 0) { - commitTracker.scheduleCommitWithin(commitTracker.timeUpperBound); - } else if (softCommitTracker.timeUpperBound > 0) { - softCommitTracker.scheduleCommitWithin(softCommitTracker.timeUpperBound); + if (commitTracker.getTimeUpperBound() > 0) { + commitTracker.scheduleCommitWithin(commitTracker.getTimeUpperBound()); + } else if (softCommitTracker.getTimeUpperBound() > 0) { + softCommitTracker.scheduleCommitWithin(softCommitTracker.getTimeUpperBound()); } } @@ -224,10 +217,10 @@ public class DirectUpdateHandler2 extends UpdateHandler { madeIt = true; - if (commitTracker.timeUpperBound > 0) { - commitTracker.scheduleCommitWithin(commitTracker.timeUpperBound); - } else if (softCommitTracker.timeUpperBound > 0) { - softCommitTracker.scheduleCommitWithin(softCommitTracker.timeUpperBound); + if (commitTracker.getTimeUpperBound() > 0) { + commitTracker.scheduleCommitWithin(commitTracker.getTimeUpperBound()); + } else if (softCommitTracker.getTimeUpperBound()> 0) { + softCommitTracker.scheduleCommitWithin(softCommitTracker.getTimeUpperBound()); } } finally { @@ -255,10 +248,10 @@ public class DirectUpdateHandler2 extends UpdateHandler { log.info("end_mergeIndexes"); // TODO: consider soft commit issues - if (rc == 1 && commitTracker.timeUpperBound > 0) { - commitTracker.scheduleCommitWithin(commitTracker.timeUpperBound); - } else if (rc == 1 && softCommitTracker.timeUpperBound > 0) { - softCommitTracker.scheduleCommitWithin(softCommitTracker.timeUpperBound); + if (rc == 1 && commitTracker.getTimeUpperBound() > 0) { + commitTracker.scheduleCommitWithin(commitTracker.getTimeUpperBound()); + } else if (rc == 1 && softCommitTracker.getTimeUpperBound() > 0) { + softCommitTracker.scheduleCommitWithin(softCommitTracker.getTimeUpperBound()); } return rc; @@ -450,20 +443,20 @@ public class DirectUpdateHandler2 extends UpdateHandler { public NamedList getStatistics() { NamedList lst = new SimpleOrderedMap(); lst.add("commits", commitCommands.get()); - if (commitTracker.docsUpperBound > 0) { - lst.add("autocommit maxDocs", commitTracker.docsUpperBound); + if (commitTracker.getTimeUpperBound() > 0) { + lst.add("autocommit maxDocs", commitTracker.getTimeUpperBound()); } - if (commitTracker.timeUpperBound > 0) { - lst.add("autocommit maxTime", "" + commitTracker.timeUpperBound + "ms"); + if (commitTracker.getTimeUpperBound() > 0) { + lst.add("autocommit maxTime", "" + commitTracker.getTimeUpperBound() + "ms"); } - lst.add("autocommits", commitTracker.autoCommitCount); - if (softCommitTracker.docsUpperBound > 0) { - lst.add("soft autocommit maxDocs", softCommitTracker.docsUpperBound); + lst.add("autocommits", commitTracker.getCommitCount()); + if (softCommitTracker.getTimeUpperBound() > 0) { + lst.add("soft autocommit maxDocs", softCommitTracker.getTimeUpperBound()); } - if (softCommitTracker.timeUpperBound > 0) { - lst.add("soft autocommit maxTime", "" + softCommitTracker.timeUpperBound + "ms"); + if (softCommitTracker.getTimeUpperBound() > 0) { + lst.add("soft autocommit maxTime", "" + softCommitTracker.getTimeUpperBound() + "ms"); } - lst.add("soft autocommits", softCommitTracker.autoCommitCount); + lst.add("soft autocommits", softCommitTracker.getCommitCount()); lst.add("optimizes", optimizeCommands.get()); lst.add("rollbacks", rollbackCommands.get()); lst.add("expungeDeletes", expungeDeleteCommands.get()); diff --git a/solr/core/src/test/org/apache/solr/update/AutoCommitTest.java b/solr/core/src/test/org/apache/solr/update/AutoCommitTest.java index c4b65287780..d5e2ea8d31b 100644 --- a/solr/core/src/test/org/apache/solr/update/AutoCommitTest.java +++ b/solr/core/src/test/org/apache/solr/update/AutoCommitTest.java @@ -99,8 +99,8 @@ public class AutoCommitTest extends AbstractSolrTestCase { DirectUpdateHandler2 updateHandler = (DirectUpdateHandler2)core.getUpdateHandler(); CommitTracker tracker = updateHandler.commitTracker; - tracker.timeUpperBound = -1; - tracker.docsUpperBound = 14; + tracker.setTimeUpperBound(-1); + tracker.setDocsUpperBound(14); core.registerNewSearcherListener(trigger); @@ -148,8 +148,8 @@ public class AutoCommitTest extends AbstractSolrTestCase { CommitTracker tracker = updater.commitTracker; // too low of a number can cause a slow host to commit before the test code checks that it // isn't there... causing a failure at "shouldn't find any" - tracker.timeUpperBound = 1000; - tracker.docsUpperBound = -1; + tracker.setTimeUpperBound(1000); + tracker.setDocsUpperBound(-1); // updater.commitCallbacks.add(trigger); XmlUpdateRequestHandler handler = new XmlUpdateRequestHandler(); @@ -221,15 +221,15 @@ public class AutoCommitTest extends AbstractSolrTestCase { DirectUpdateHandler2 updateHandler = (DirectUpdateHandler2)core.getUpdateHandler(); CommitTracker tracker = updateHandler.commitTracker; - tracker.timeUpperBound = -1; - tracker.docsUpperBound = 8; + tracker.setTimeUpperBound(-1); + tracker.setDocsUpperBound(8); NewSearcherListener softTrigger = new NewSearcherListener(); CommitTracker softTracker = updateHandler.softCommitTracker; - softTracker.timeUpperBound = -1; - softTracker.docsUpperBound = 4; + softTracker.setTimeUpperBound(-1); + softTracker.setDocsUpperBound(4); core.registerNewSearcherListener(softTrigger); XmlUpdateRequestHandler handler = new XmlUpdateRequestHandler(); @@ -282,7 +282,7 @@ public class AutoCommitTest extends AbstractSolrTestCase { assertTrue(trigger.waitForNewSearcher(10000)); assertQ("should find 10", req("*:*") ,"//result[@numFound=10]" ); - assertEquals( 1, softTracker.getCommitCount()); + assertEquals( 2, softTracker.getCommitCount()); assertEquals( 1, tracker.getCommitCount()); } @@ -296,8 +296,8 @@ public class AutoCommitTest extends AbstractSolrTestCase { // too low of a number can cause a slow host to commit before the test code checks that it // isn't there... causing a failure at "shouldn't find any" - softTracker.timeUpperBound = 2000; - softTracker.docsUpperBound = -1; + softTracker.setTimeUpperBound(2000); + softTracker.setDocsUpperBound(-1); // updater.commitCallbacks.add(trigger); XmlUpdateRequestHandler handler = new XmlUpdateRequestHandler(); @@ -362,4 +362,87 @@ public class AutoCommitTest extends AbstractSolrTestCase { assertQ("now it should", req("id:500") ,"//result[@numFound=1]" ); assertQ("but not this", req("id:531") ,"//result[@numFound=0]" ); } + + public void testSoftAndHardCommitMaxTime() throws Exception { + SolrCore core = h.getCore(); + NewSearcherListener trigger = new NewSearcherListener(); + core.registerNewSearcherListener(trigger); + DirectUpdateHandler2 updater = (DirectUpdateHandler2) core.getUpdateHandler(); + CommitTracker hardTracker = updater.commitTracker; + CommitTracker softTracker = updater.softCommitTracker; + + // too low of a number can cause a slow host to commit before the test code checks that it + // isn't there... causing a failure at "shouldn't find any" + softTracker.setTimeUpperBound(200); + softTracker.setDocsUpperBound(-1); + hardTracker.setTimeUpperBound(1000); + hardTracker.setDocsUpperBound(-1); + // updater.commitCallbacks.add(trigger); + + XmlUpdateRequestHandler handler = new XmlUpdateRequestHandler(); + handler.init( null ); + + MapSolrParams params = new MapSolrParams( new HashMap() ); + + // Add a single document + SolrQueryResponse rsp = new SolrQueryResponse(); + SolrQueryRequestBase req = new SolrQueryRequestBase( core, params ) {}; + req.setContentStreams( toContentStreams( + adoc("id", "529", "field_t", "what's inside?", "subject", "info"), null ) ); + trigger.reset(); + handler.handleRequest( req, rsp ); + + // Check it it is in the index + assertQ("shouldn't find any", req("id:529") ,"//result[@numFound=0]" ); + + // Wait longer than the autocommit time + assertTrue(trigger.waitForNewSearcher(30000)); + trigger.reset(); + req.setContentStreams( toContentStreams( + adoc("id", "530", "field_t", "what's inside?", "subject", "info"), null ) ); + handler.handleRequest( req, rsp ); + + // Now make sure we can find it + assertQ("should find one", req("id:529") ,"//result[@numFound=1]" ); + // But not this one + assertQ("should find none", req("id:530") ,"//result[@numFound=0]" ); + + // Delete the document + assertU( delI("529") ); + assertQ("deleted, but should still be there", req("id:529") ,"//result[@numFound=1]" ); + // Wait longer than the autocommit time + assertTrue(trigger.waitForNewSearcher(15000)); + trigger.reset(); + req.setContentStreams( toContentStreams( + adoc("id", "550", "field_t", "what's inside?", "subject", "info"), null ) ); + handler.handleRequest( req, rsp ); + assertEquals( 2, softTracker.getCommitCount() ); + assertQ("deleted and time has passed", req("id:529") ,"//result[@numFound=0]" ); + + // now make the call 5 times really fast and make sure it + // only commits once + req.setContentStreams( toContentStreams( + adoc("id", "500" ), null ) ); + for( int i=0;i<5; i++ ) { + handler.handleRequest( req, rsp ); + } + assertQ("should not be there yet", req("id:500") ,"//result[@numFound=0]" ); + + // Wait longer than the autocommit time + assertTrue(trigger.waitForNewSearcher(15000)); + trigger.reset(); + + req.setContentStreams( toContentStreams( + adoc("id", "531", "field_t", "what's inside?", "subject", "info"), null ) ); + handler.handleRequest( req, rsp ); + + // depending on timing, you might see 2 or 3 soft commits + int softCommitCnt = softTracker.getCommitCount(); + assertTrue("commit cnt:" + softCommitCnt, softCommitCnt == 2 + || softCommitCnt == 3); + assertEquals(1, hardTracker.getCommitCount()); + + assertQ("now it should", req("id:500") ,"//result[@numFound=1]" ); + assertQ("but not this", req("id:531") ,"//result[@numFound=0]" ); + } }