mirror of https://github.com/apache/lucene.git
Commit SOLR-126. Omitted the typed collection change in DUH2 (trunk has been updated), and
added autocommits to the stats tracked by DUH2 git-svn-id: https://svn.apache.org/repos/asf/lucene/solr/trunk@502328 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
7e18144473
commit
6d18f027e0
|
@ -68,6 +68,10 @@ New Features
|
|||
accessed using a URL structure based on that name.
|
||||
(Ryan McKinley via hossman)
|
||||
|
||||
9. SOLR-126: DirectUpdateHandler2 supports autocommitting after a specified time
|
||||
(in ms), using <autoCommit><maxTime>10000</maxTime></autoCommit>.
|
||||
(Ryan McKinley via klaas).
|
||||
|
||||
Changes in runtime behavior
|
||||
1. Highlighting using DisMax will only pick up terms from the main
|
||||
user query, not boost or filter queries (klaas).
|
||||
|
|
|
@ -62,6 +62,7 @@
|
|||
<!-- autocommit pending docs if certain criteria are met
|
||||
<autoCommit>
|
||||
<maxDocs>10000</maxDocs>
|
||||
<maxTime>1000</maxTime>
|
||||
</autoCommit>
|
||||
-->
|
||||
|
||||
|
|
|
@ -31,8 +31,12 @@ import org.apache.lucene.search.Query;
|
|||
import java.util.HashMap;
|
||||
import java.util.TreeMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.ScheduledFuture;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.locks.Lock;
|
||||
import java.util.concurrent.locks.ReadWriteLock;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
|
@ -246,7 +250,7 @@ public class DirectUpdateHandler2 extends UpdateHandler {
|
|||
// adding document -- prep writer
|
||||
closeSearcher();
|
||||
openWriter();
|
||||
tracker.increment(1);
|
||||
tracker.addedDocument();
|
||||
} else {
|
||||
// exit prematurely
|
||||
return rc;
|
||||
|
@ -267,9 +271,6 @@ public class DirectUpdateHandler2 extends UpdateHandler {
|
|||
numDocsPending.incrementAndGet();
|
||||
}
|
||||
}
|
||||
|
||||
// might need to commit (wait for searcher if so)
|
||||
checkCommit(true);
|
||||
return rc;
|
||||
}
|
||||
|
||||
|
@ -556,23 +557,6 @@ public class DirectUpdateHandler2 extends UpdateHandler {
|
|||
log.info("closed " + this);
|
||||
}
|
||||
|
||||
/** Inform tracker that <code>docs</code> docs have been added. Will
|
||||
* perform commit and/or optimize if constraints are satisfied.
|
||||
*/
|
||||
protected void checkCommit() throws IOException {
|
||||
checkCommit(false);
|
||||
}
|
||||
protected void checkCommit(boolean waitSearcher) throws IOException {
|
||||
synchronized (tracker) {
|
||||
if (tracker.needCommit()) {
|
||||
CommitUpdateCommand cmd = new CommitUpdateCommand(false);
|
||||
cmd.waitSearcher = waitSearcher;
|
||||
log.info("autocommitting: " + cmd);
|
||||
commit(cmd);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/** Helper class for tracking autoCommit state.
|
||||
*
|
||||
* Note: This is purely an implementation detail of autoCommit and will
|
||||
|
@ -581,69 +565,87 @@ public class DirectUpdateHandler2 extends UpdateHandler {
|
|||
*
|
||||
* Note: all access must be synchronized.
|
||||
*/
|
||||
class CommitTracker {
|
||||
class CommitTracker implements Runnable
|
||||
{
|
||||
// settings, not final so we can change them in testing
|
||||
int docsUpperBound;
|
||||
long timeUpperBound;
|
||||
|
||||
// settings
|
||||
private final ConstraintTester commitTester;
|
||||
|
||||
// state
|
||||
private long timeOfCommit;
|
||||
private long docsSinceCommit;
|
||||
private boolean needCommit;
|
||||
|
||||
public CommitTracker() {
|
||||
timeOfCommit = timestamp();
|
||||
docsSinceCommit = 0;
|
||||
needCommit = false;
|
||||
|
||||
commitTester = new ConstraintTester(
|
||||
SolrConfig.config.getInt("updateHandler/autoCommit/maxDocs", -1));
|
||||
SolrCore.log.info("autocommit if " + commitTester);
|
||||
}
|
||||
|
||||
/** Indicate that <code>count</code> docs have been added. May set
|
||||
* <code>needCommit()</code> and perhaps also <code>needOptimize</code>
|
||||
*/
|
||||
public void increment(int count) {
|
||||
docsSinceCommit += count;
|
||||
if (docsSinceCommit > 0) {
|
||||
needCommit = commitTester.testConstraints(docsSinceCommit);
|
||||
}
|
||||
}
|
||||
|
||||
/** @return true if commit is needed */
|
||||
public boolean needCommit() { return needCommit; }
|
||||
private final ScheduledExecutorService scheduler =
|
||||
Executors.newScheduledThreadPool(1);
|
||||
private ScheduledFuture pending;
|
||||
|
||||
/** Inform tracker that a commit has occurred */
|
||||
public void didCommit() {
|
||||
didCommit(docsSinceCommit);
|
||||
// state
|
||||
long docsSinceCommit;
|
||||
int autoCommitCount= 0;
|
||||
long lastAddedTime = -1;
|
||||
|
||||
public CommitTracker() {
|
||||
docsSinceCommit = 0;
|
||||
pending = null;
|
||||
|
||||
docsUpperBound = SolrConfig.config.getInt("updateHandler/autoCommit/maxDocs", -1);
|
||||
timeUpperBound = SolrConfig.config.getInt("updateHandler/autoCommit/maxTime", -1);
|
||||
|
||||
SolrCore.log.info("CommitTracker: " + this);
|
||||
}
|
||||
public void didCommit(long docsCommitted) {
|
||||
timeOfCommit = timestamp();
|
||||
docsSinceCommit -= docsCommitted;
|
||||
needCommit = false;
|
||||
|
||||
}
|
||||
/** Indicate that documents have been added
|
||||
*/
|
||||
public void addedDocument() {
|
||||
docsSinceCommit++;
|
||||
lastAddedTime = System.currentTimeMillis();
|
||||
if( pending == null ) { // Don't start a new event if one is already waiting
|
||||
if( timeUpperBound > 0 ) {
|
||||
pending = scheduler.schedule( this, timeUpperBound, TimeUnit.MILLISECONDS );
|
||||
}
|
||||
else if( docsUpperBound > 0 && (docsSinceCommit > docsUpperBound) ) {
|
||||
// 1/4 second seems fast enough for anyone using maxDocs
|
||||
pending = scheduler.schedule( this, 250, TimeUnit.MILLISECONDS );
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/** @return milliseconds since epoch */
|
||||
private long timestamp() { return System.currentTimeMillis();}
|
||||
/** Inform tracker that a commit has occurred, cancel any pending commits */
|
||||
public void didCommit() {
|
||||
if( pending != null ) {
|
||||
pending.cancel(false);
|
||||
pending = null; // let it start another one
|
||||
}
|
||||
docsSinceCommit = 0;
|
||||
}
|
||||
|
||||
class ConstraintTester {
|
||||
private long docsUpperBound = -1;
|
||||
public ConstraintTester(long docsUpperBound) {
|
||||
this.docsUpperBound = docsUpperBound;
|
||||
/** This is the worker part for the ScheduledFuture **/
|
||||
public synchronized void run() {
|
||||
long started = System.currentTimeMillis();
|
||||
try {
|
||||
CommitUpdateCommand command = new CommitUpdateCommand( false );
|
||||
command.waitFlush = true;
|
||||
command.waitSearcher = true;
|
||||
commit( command );
|
||||
autoCommitCount++;
|
||||
}
|
||||
catch (Exception e) {
|
||||
log.severe( "auto commit error..." );
|
||||
e.printStackTrace();
|
||||
}
|
||||
private boolean checkDocsUpper(long docs) {
|
||||
return docsUpperBound == -1 ? false : docsUpperBound <= docs;
|
||||
finally {
|
||||
pending = null;
|
||||
}
|
||||
public boolean testConstraints(long docs) {
|
||||
return checkDocsUpper(docs);
|
||||
}
|
||||
public String toString() {
|
||||
return docsUpperBound != -1 ? "docs >= " + docsUpperBound : "{no doc limit}" ;
|
||||
|
||||
// check if docs have been submitted since the commit started
|
||||
if( lastAddedTime > started ) {
|
||||
if( docsSinceCommit > docsUpperBound ) {
|
||||
pending = scheduler.schedule( this, 100, TimeUnit.MILLISECONDS );
|
||||
}
|
||||
else if( timeUpperBound > 0 ) {
|
||||
pending = scheduler.schedule( this, timeUpperBound, TimeUnit.MILLISECONDS );
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/////////////////////////////////////////////////////////////////////
|
||||
// SolrInfoMBean stuff: Statistics and Module Info
|
||||
/////////////////////////////////////////////////////////////////////
|
||||
|
@ -679,6 +681,7 @@ public class DirectUpdateHandler2 extends UpdateHandler {
|
|||
public NamedList getStatistics() {
|
||||
NamedList lst = new SimpleOrderedMap();
|
||||
lst.add("commits", commitCommands.get());
|
||||
lst.add("autocommits", tracker.autoCommitCount);
|
||||
lst.add("optimizes", optimizeCommands.get());
|
||||
lst.add("docsPending", numDocsPending.get());
|
||||
// pset.size() not synchronized, but it should be fine to access.
|
||||
|
@ -692,7 +695,6 @@ public class DirectUpdateHandler2 extends UpdateHandler {
|
|||
lst.add("cumulative_deletesByQuery", deleteByQueryCommandsCumulative.get());
|
||||
lst.add("cumulative_errors", numErrorsCumulative.get());
|
||||
lst.add("docsDeleted", numDocsDeleted.get());
|
||||
|
||||
return lst;
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,161 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.solr.update;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
|
||||
import org.apache.solr.core.SolrCore;
|
||||
import org.apache.solr.handler.XmlUpdateRequestHandler;
|
||||
import org.apache.solr.request.ContentStream;
|
||||
import org.apache.solr.request.MapSolrParams;
|
||||
import org.apache.solr.request.SolrQueryRequestBase;
|
||||
import org.apache.solr.request.SolrQueryResponse;
|
||||
import org.apache.solr.util.AbstractSolrTestCase;
|
||||
|
||||
/**
|
||||
*
|
||||
* @author ryan
|
||||
*
|
||||
*/
|
||||
public class AutoCommitTest extends AbstractSolrTestCase {
|
||||
|
||||
public String getSchemaFile() { return "schema.xml"; }
|
||||
public String getSolrConfigFile() { return "solrconfig.xml"; }
|
||||
|
||||
/**
|
||||
* Take a string and make it an iterable ContentStream
|
||||
*
|
||||
* This should be moved to a helper class. (it is useful for the client too!)
|
||||
*/
|
||||
public static Collection<ContentStream> toContentStreams( final String str, final String contentType )
|
||||
{
|
||||
ArrayList<ContentStream> streams = new ArrayList<ContentStream>();
|
||||
streams.add( new ContentStream() {
|
||||
public String getContentType() { return contentType; }
|
||||
public Long getSize() { return Long.valueOf( str.length() ); }
|
||||
public String getName() { return null; }
|
||||
public String getSourceInfo() { return null; }
|
||||
|
||||
public InputStream getStream() throws IOException {
|
||||
return new ByteArrayInputStream( str.getBytes() );
|
||||
}
|
||||
});
|
||||
return streams;
|
||||
}
|
||||
|
||||
public void testMaxDocs() throws Exception {
|
||||
|
||||
DirectUpdateHandler2 updater = (DirectUpdateHandler2)SolrCore.getSolrCore().getUpdateHandler();
|
||||
DirectUpdateHandler2.CommitTracker tracker = updater.tracker;
|
||||
tracker.timeUpperBound = -1;
|
||||
tracker.docsUpperBound = 5;
|
||||
|
||||
XmlUpdateRequestHandler handler = new XmlUpdateRequestHandler();
|
||||
handler.init( null );
|
||||
|
||||
SolrCore core = SolrCore.getSolrCore();
|
||||
MapSolrParams params = new MapSolrParams( new HashMap<String, String>() );
|
||||
|
||||
// Add a single document
|
||||
SolrQueryResponse rsp = new SolrQueryResponse();
|
||||
SolrQueryRequestBase req = new SolrQueryRequestBase( core, params ) {};
|
||||
for( int i=0; i<15; i++ ) {
|
||||
req.setContentStreams( toContentStreams(
|
||||
adoc("id", "A"+i, "subject", "info" ), null ) );
|
||||
handler.handleRequest( req, rsp );
|
||||
}
|
||||
// It should not be there right away
|
||||
assertQ("shouldn't find any", req("id:A1") ,"//result[@numFound=0]" );
|
||||
assertEquals( 0, tracker.autoCommitCount );
|
||||
|
||||
// Wait longer then the autocommit time
|
||||
Thread.sleep( 500 );
|
||||
|
||||
// Now make sure we can find it
|
||||
assertQ("should find one", req("id:A1") ,"//result[@numFound=1]" );
|
||||
assertEquals( 1, tracker.autoCommitCount );
|
||||
|
||||
// Now add some more
|
||||
for( int i=0; i<15; i++ ) {
|
||||
req.setContentStreams( toContentStreams(
|
||||
adoc("id", "B"+i, "subject", "info" ), null ) );
|
||||
handler.handleRequest( req, rsp );
|
||||
}
|
||||
// It should not be there right away
|
||||
assertQ("shouldn't find any", req("id:B1") ,"//result[@numFound=0]" );
|
||||
assertEquals( 1, tracker.autoCommitCount );
|
||||
|
||||
Thread.sleep( 500 );
|
||||
assertQ("should find one", req("id:B1") ,"//result[@numFound=1]" );
|
||||
assertEquals( 2, tracker.autoCommitCount );
|
||||
}
|
||||
|
||||
public void testMaxTime() throws Exception {
|
||||
|
||||
DirectUpdateHandler2 updater = (DirectUpdateHandler2)SolrCore.getSolrCore().getUpdateHandler();
|
||||
DirectUpdateHandler2.CommitTracker tracker = updater.tracker;
|
||||
tracker.timeUpperBound = 500;
|
||||
tracker.docsUpperBound = -1;
|
||||
|
||||
XmlUpdateRequestHandler handler = new XmlUpdateRequestHandler();
|
||||
handler.init( null );
|
||||
|
||||
SolrCore core = SolrCore.getSolrCore();
|
||||
MapSolrParams params = new MapSolrParams( new HashMap<String, String>() );
|
||||
|
||||
// Add a single document
|
||||
SolrQueryResponse rsp = new SolrQueryResponse();
|
||||
SolrQueryRequestBase req = new SolrQueryRequestBase( core, params ) {};
|
||||
req.setContentStreams( toContentStreams(
|
||||
adoc("id", "529",
|
||||
"field_t", "what's inside?",
|
||||
"subject", "info"
|
||||
), null ) );
|
||||
handler.handleRequest( req, rsp );
|
||||
|
||||
// Check it it is in the index
|
||||
assertQ("shouldn't find any", req("id:529") ,"//result[@numFound=0]" );
|
||||
|
||||
// Wait longer then the autocommit time
|
||||
Thread.sleep( 1000 );
|
||||
|
||||
// Now make sure we can find it
|
||||
assertQ("should find one", req("id:529") ,"//result[@numFound=1]" );
|
||||
|
||||
// now make the call 10 times really fast and make sure it
|
||||
// only commits once
|
||||
req.setContentStreams( toContentStreams(
|
||||
adoc("id", "500" ), null ) );
|
||||
for( int i=0;i<10; i++ ) {
|
||||
handler.handleRequest( req, rsp );
|
||||
}
|
||||
assertQ("should not be there yet", req("id:500") ,"//result[@numFound=0]" );
|
||||
assertEquals( 1, tracker.autoCommitCount );
|
||||
|
||||
// Wait longer then the autocommit time
|
||||
Thread.sleep( 1000 );
|
||||
|
||||
assertQ("now it should", req("id:500") ,"//result[@numFound=1]" );
|
||||
assertEquals( 2, tracker.autoCommitCount );
|
||||
}
|
||||
}
|
|
@ -60,10 +60,9 @@
|
|||
<updateHandler class="solr.DirectUpdateHandler2">
|
||||
|
||||
<!-- autocommit pending docs if certain criteria are met
|
||||
NOTE: maxSecs not implemented yet
|
||||
<autoCommit>
|
||||
<maxDocs>10000</maxDocs>
|
||||
<maxSec>3600</maxSec>
|
||||
<maxTime>3600000</maxTime> <!-- one hour in milliseconds -->
|
||||
</autoCommit>
|
||||
-->
|
||||
|
||||
|
|
Loading…
Reference in New Issue