SOLR-2193, SOLR-2565: The default Solr update handler has been improved so

that it uses fewer locks, keeps the IndexWriter open rather than closing it
  on each commit (ie commits no longer wait for background merges to complete), 
  works with SolrCore to provide faster 'soft' commits, and has an improved API 
  that requires less instanceof special casing. 

  You may now specify a 'soft' commit when committing. This will
  use Lucene's NRT feature to avoid guaranteeing documents are on stable storage in exchange
  for faster reopen times. There is also a new 'soft' autocommit tracker that can be
  configured.

 SolrCores now properly share IndexWriters across SolrCore reloads.

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1141542 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Mark Robert Miller 2011-06-30 13:59:59 +00:00
parent 988b53e499
commit b5be90974b
37 changed files with 994 additions and 571 deletions

View File

@ -149,6 +149,10 @@ New Features
SortFields. ValueSourceSortField instances must be rewritten before they can be used.
This is done by SolrIndexSearcher when necessary. (Chris Male).
* SOLR-2193, SOLR-2565: You may now specify a 'soft' commit when committing. This will
use Lucene's NRT feature to avoid guaranteeing documents are on stable storage in exchange
for faster reopen times. There is also a new 'soft' autocommit tracker that can be
configured. (Mark Miller, Robert Muir)
Optimizations
----------------------
@ -171,6 +175,12 @@ Optimizations
improvement is 5%, but can be much greater (up to 10x faster) when facet.offset
is very large (deep paging). (yonik)
* SOLR-2193, SOLR-2565: The default Solr update handler has been improved so
that it uses fewer locks, keeps the IndexWriter open rather than closing it
on each commit (ie commits no longer wait for background merges to complete),
works with SolrCore to provide faster 'soft' commits, and has an improved API
that requires less instanceof special casing. (Mark Miller, Robert Muir)
Bug Fixes
----------------------
@ -196,6 +206,9 @@ Bug Fixes
* SOLR-2275: fix DisMax 'mm' parsing to be tolerant of whitespace
(Erick Erickson via hossman)
* SOLR-2193, SOLR-2565: SolrCores now properly share IndexWriters across SolrCore reloads.
(Mark Miller, Robert Muir)
Other Changes
----------------------

View File

@ -288,6 +288,25 @@
</autoCommit>
-->
<!-- SoftAutoCommit
Perform a 'soft' commit automatically under certain conditions.
This commit avoids ensuring that data is synched to disk.
maxDocs - Maximum number of documents to add since the last
soft commit before automaticly triggering a new soft commit.
maxTime - Maximum amount of time in ms that is allowed to pass
since a document was added before automaticly
triggering a new soft commit.
-->
<!--
<autoSoftCommit>
<maxDocs>10000</maxDocs>
<maxTime>1000</maxTime>
</autoSoftCommit>
-->
<!-- Update Related Event Listeners
Various IndexWriter related events can trigger Listeners to

View File

@ -25,12 +25,12 @@ package org.apache.solr.common.params;
*/
public interface UpdateParams
{
/** wait till the command has flushed */
public static String WAIT_FLUSH = "waitFlush";
/** wait for the search to warm up */
public static String WAIT_SEARCHER = "waitSearcher";
public static String SOFT_COMMIT = "softCommit";
/** overwrite indexing fields */
public static String OVERWRITE = "overwrite";

View File

@ -38,6 +38,11 @@ public class AbstractSolrEventListener implements SolrEventListener {
throw new UnsupportedOperationException();
}
@Override
public void postSoftCommit() {
throw new UnsupportedOperationException();
}
public void newSearcher(SolrIndexSearcher newSearcher, SolrIndexSearcher currentSearcher) {
throw new UnsupportedOperationException();
}

View File

@ -250,7 +250,7 @@ public class CoreContainer
}
}
private static Properties getCoreProps(String instanceDir, String file, Properties defaults) {
static Properties getCoreProps(String instanceDir, String file, Properties defaults) {
if(file == null) file = "conf"+File.separator+ "solrcore.properties";
File corePropsFile = new File(file);
if(!corePropsFile.isAbsolute()){
@ -648,9 +648,8 @@ public class CoreContainer
schema = new IndexSchema(config, dcore.getSchemaName(), null);
}
}
String dataDir = null;
SolrCore core = new SolrCore(dcore.getName(), dataDir, config, schema, dcore);
SolrCore core = new SolrCore(dcore.getName(), null, config, schema, dcore);
return core;
}
@ -712,7 +711,7 @@ public class CoreContainer
if (core == null)
throw new SolrException( SolrException.ErrorCode.BAD_REQUEST, "No such core: " + name );
SolrCore newCore = create(core.getCoreDescriptor());
SolrCore newCore = core.reload(libLoader);
register(name, newCore, false);
}

View File

@ -20,11 +20,7 @@ package org.apache.solr.core;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.util.DOMUtil;
import org.apache.solr.common.util.RegexFileFilter;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.handler.PingRequestHandler;
import org.apache.solr.handler.component.SearchComponent;
import org.apache.solr.request.LocalSolrQueryRequest;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.request.SolrRequestHandler;
import org.apache.solr.response.QueryResponseWriter;
import org.apache.solr.response.transform.TransformerFactory;
@ -36,7 +32,6 @@ import org.apache.solr.search.ValueSourceParser;
import org.apache.solr.update.SolrIndexConfig;
import org.apache.solr.update.processor.UpdateRequestProcessorChain;
import org.apache.solr.spelling.QueryConverter;
import org.apache.solr.highlight.SolrHighlighter;
import org.apache.lucene.search.BooleanQuery;
import org.apache.lucene.index.IndexDeletionPolicy;
import org.apache.lucene.index.codecs.Codec;
@ -217,7 +212,9 @@ public class SolrConfig extends Config {
return new UpdateHandlerInfo(get("updateHandler/@class",null),
getInt("updateHandler/autoCommit/maxDocs",-1),
getInt("updateHandler/autoCommit/maxTime",-1),
getInt("updateHandler/commitIntervalLowerBound",-1));
getInt("updateHandler/commitIntervalLowerBound",-1),
getInt("updateHandler/autoSoftCommit/maxDocs",-1),
getInt("updateHandler/autoSoftCommit/maxTime",-1));
}
private void loadPluginInfo(Class clazz, String tag, boolean requireName, boolean requireClass) {
@ -374,7 +371,8 @@ public class SolrConfig extends Config {
public static class UpdateHandlerInfo{
public final String className;
public final int autoCommmitMaxDocs,autoCommmitMaxTime,commitIntervalLowerBound;
public final int autoCommmitMaxDocs,autoCommmitMaxTime,commitIntervalLowerBound,
autoSoftCommmitMaxDocs,autoSoftCommmitMaxTime;
/**
* @param className
@ -382,11 +380,15 @@ public class SolrConfig extends Config {
* @param autoCommmitMaxTime set -1 as default
* @param commitIntervalLowerBound set -1 as default
*/
public UpdateHandlerInfo(String className, int autoCommmitMaxDocs, int autoCommmitMaxTime, int commitIntervalLowerBound) {
public UpdateHandlerInfo(String className, int autoCommmitMaxDocs, int autoCommmitMaxTime, int commitIntervalLowerBound,
int autoSoftCommmitMaxDocs, int autoSoftCommmitMaxTime) {
this.className = className;
this.autoCommmitMaxDocs = autoCommmitMaxDocs;
this.autoCommmitMaxTime = autoCommmitMaxTime;
this.commitIntervalLowerBound = commitIntervalLowerBound;
this.autoSoftCommmitMaxDocs = autoSoftCommmitMaxDocs;
this.autoSoftCommmitMaxTime = autoSoftCommmitMaxTime;
}
}

View File

@ -32,7 +32,6 @@ import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.SimpleOrderedMap;
import org.apache.solr.handler.admin.ShowFileRequestHandler;
import org.apache.solr.handler.component.*;
import org.apache.solr.highlight.SolrHighlighter;
import org.apache.solr.request.*;
import org.apache.solr.response.*;
import org.apache.solr.response.transform.TransformerFactory;
@ -298,6 +297,24 @@ public final class SolrCore implements SolrInfoMBean {
responseWriters.put(name, responseWriter);
}
public SolrCore reload(ClassLoader libLoader) throws IOException, ParserConfigurationException, SAXException {
// TODO - null descriptor and what if indexwriter settings have changed
SolrResourceLoader solrLoader = new SolrResourceLoader(getResourceLoader()
.getInstanceDir(), libLoader, CoreContainer.getCoreProps(
getResourceLoader().getInstanceDir(),
coreDescriptor.getPropertiesName(), coreDescriptor.getCoreProperties()));
SolrConfig config = new SolrConfig(solrLoader,
coreDescriptor.getConfigName(), null);
IndexSchema schema = new IndexSchema(config,
coreDescriptor.getSchemaName(), null);
updateHandler.incref();
SolrCore core = new SolrCore(coreDescriptor.getName(), null, config,
schema, coreDescriptor, updateHandler);
return core;
}
// gets a non-caching searcher
public SolrIndexSearcher newSearcher(String name) throws IOException {
@ -412,6 +429,32 @@ public final class SolrCore implements SolrInfoMBean {
}
}
private <T extends Object> T createReloadedUpdateHandler(String className, Class<UpdateHandler> class1, String msg, UpdateHandler updateHandler) {
Class clazz = null;
if (msg == null) msg = "SolrCore Object";
try {
clazz = getResourceLoader().findClass(className);
if (class1 != null && !class1.isAssignableFrom(clazz)) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,"Error Instantiating "+msg+", "+className+ " is not a " +class1.getName());
}
//most of the classes do not have constructors which takes SolrCore argument. It is recommended to obtain SolrCore by implementing SolrCoreAware.
// So invariably always it will cause a NoSuchMethodException. So iterate though the list of available constructors
Constructor justSolrCoreCon = null;
Constructor[] cons = clazz.getConstructors();
for (Constructor con : cons) {
Class[] types = con.getParameterTypes();
if(types.length == 2 && types[0] == SolrCore.class && types[1] == UpdateHandler.class){
return (T)con.newInstance(this, updateHandler);
}
}
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,"Error Instantiating "+msg+", "+className+ " could not find proper constructor for " +class1.getName(), false);
} catch (SolrException e) {
throw e;
} catch (Exception e) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,"Error Instantiating "+msg+", "+className+ " failed to instantiate " +class1.getName(), e, false);
}
}
public <T extends Object> T createInitInstance(PluginInfo info,Class<T> cast, String msg, String defClassName){
if(info == null) return null;
T o = createInstance(info.className == null ? defClassName : info.className,cast, msg);
@ -435,6 +478,10 @@ public final class SolrCore implements SolrInfoMBean {
return createInstance(className, UpdateHandler.class, "Update Handler");
}
private UpdateHandler createUpdateHandler(String className, UpdateHandler updateHandler) {
return createReloadedUpdateHandler(className, UpdateHandler.class, "Update Handler", updateHandler);
}
/**
*
* @param dataDir
@ -449,16 +496,33 @@ public final class SolrCore implements SolrInfoMBean {
this(null, dataDir, new SolrConfig(), schema, null );
}
/**
* Creates a new core and register it in the list of cores.
* If a core with the same name already exists, it will be stopped and replaced by this one.
*
* @param name
* @param dataDir the index directory
* @param config a solr config instance
* @param schema a solr schema instance
* @param cd
*
* @since solr 1.3
*/
public SolrCore(String name, String dataDir, SolrConfig config, IndexSchema schema, CoreDescriptor cd) {
this(name, dataDir, config, schema, cd, null);
}
/**
* Creates a new core and register it in the list of cores.
* If a core with the same name already exists, it will be stopped and replaced by this one.
*@param dataDir the index directory
*@param config a solr config instance
*@param schema a solr schema instance
*@param updateHandler
*
*@since solr 1.3
*/
public SolrCore(String name, String dataDir, SolrConfig config, IndexSchema schema, CoreDescriptor cd) {
public SolrCore(String name, String dataDir, SolrConfig config, IndexSchema schema, CoreDescriptor cd, UpdateHandler updateHandler) {
coreDescriptor = cd;
this.setName( name );
resourceLoader = config.getResourceLoader();
@ -536,13 +600,22 @@ public final class SolrCore implements SolrInfoMBean {
String updateHandlerClass = solrConfig.getUpdateHandlerInfo().className;
updateHandler = createUpdateHandler(updateHandlerClass == null ? DirectUpdateHandler2.class.getName():updateHandlerClass);
infoRegistry.put("updateHandler", updateHandler);
if (updateHandler == null) {
this.updateHandler = createUpdateHandler(updateHandlerClass == null ? DirectUpdateHandler2.class
.getName() : updateHandlerClass);
} else {
this.updateHandler = createUpdateHandler(
updateHandlerClass == null ? DirectUpdateHandler2.class.getName()
: updateHandlerClass, updateHandler);
}
infoRegistry.put("updateHandler", this.updateHandler);
// Finally tell anyone who wants to know
resourceLoader.inform( resourceLoader );
resourceLoader.inform( this ); // last call before the latch is released.
} catch (IOException e) {
log.error("", e);
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, null, e, false);
} finally {
// allow firstSearcher events to fire
@ -917,6 +990,9 @@ public final class SolrCore implements SolrInfoMBean {
}
}
public RefCounted<SolrIndexSearcher> getSearcher(boolean forceNew, boolean returnSearcher, final Future[] waitSearcher) throws IOException {
return getSearcher(forceNew, returnSearcher, waitSearcher, false);
}
/**
* Get a {@link SolrIndexSearcher} or start the process of creating a new one.
@ -955,9 +1031,10 @@ public final class SolrCore implements SolrInfoMBean {
* @param forceNew if true, force the open of a new index searcher regardless if there is already one open.
* @param returnSearcher if true, returns a {@link SolrIndexSearcher} holder with the refcount already incremented.
* @param waitSearcher if non-null, will be filled in with a {@link Future} that will return after the new searcher is registered.
* @param updateHandlerReopens if true, the UpdateHandler will be used when reopening a {@link SolrIndexSearcher}.
* @throws IOException
*/
public RefCounted<SolrIndexSearcher> getSearcher(boolean forceNew, boolean returnSearcher, final Future[] waitSearcher) throws IOException {
public RefCounted<SolrIndexSearcher> getSearcher(boolean forceNew, boolean returnSearcher, final Future[] waitSearcher, boolean updateHandlerReopens) throws IOException {
// it may take some time to open an index.... we may need to make
// sure that two threads aren't trying to open one at the same time
// if it isn't necessary.
@ -1024,14 +1101,25 @@ public final class SolrCore implements SolrInfoMBean {
if (newestSearcher != null && solrConfig.reopenReaders
&& indexDirFile.equals(newIndexDirFile)) {
if (updateHandlerReopens) {
tmp = getUpdateHandler().reopenSearcher(newestSearcher.get());
} else {
IndexReader currentReader = newestSearcher.get().getIndexReader();
IndexReader newReader = currentReader.reopen();
IndexReader newReader;
newReader = currentReader.reopen();
if (newReader == currentReader) {
currentReader.incRef();
}
tmp = new SolrIndexSearcher(this, schema, "main", newReader, true, true);
}
} else {
IndexReader reader = getIndexReaderFactory().newReader(getDirectoryFactory().open(newIndexDir), true);
tmp = new SolrIndexSearcher(this, schema, "main", reader, true, true);

View File

@ -32,6 +32,8 @@ public interface SolrEventListener extends NamedListInitializedPlugin{
public void postCommit();
public void postSoftCommit();
/** The searchers passed here are only guaranteed to be valid for the duration
* of this method call, so care should be taken not to spawn threads or asynchronous
* tasks with references to these searchers.

View File

@ -105,13 +105,13 @@ class JsonLoader extends ContentStreamLoader {
}
else if( v.equals( XmlUpdateRequestHandler.COMMIT ) ) {
CommitUpdateCommand cmd = new CommitUpdateCommand(req, false );
cmd.waitFlush = cmd.waitSearcher = true;
cmd.waitSearcher = true;
parseCommitOptions( cmd );
processor.processCommit( cmd );
}
else if( v.equals( XmlUpdateRequestHandler.OPTIMIZE ) ) {
CommitUpdateCommand cmd = new CommitUpdateCommand(req, true );
cmd.waitFlush = cmd.waitSearcher = true;
cmd.waitSearcher = true;
parseCommitOptions( cmd );
processor.processCommit( cmd );
}
@ -207,9 +207,6 @@ class JsonLoader extends ContentStreamLoader {
if( XmlUpdateRequestHandler.WAIT_SEARCHER.equals( key ) ) {
cmd.waitSearcher = parser.getBoolean();
}
else if( XmlUpdateRequestHandler.WAIT_FLUSH.equals( key ) ) {
cmd.waitFlush = parser.getBoolean();
}
else {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Unknown key: "+key+" ["+parser.getPosition()+"]" );
}

View File

@ -31,7 +31,6 @@ import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.response.BinaryQueryResponseWriter;
import org.apache.solr.response.SolrQueryResponse;
import org.apache.solr.search.SolrIndexSearcher;
import org.apache.solr.update.DirectUpdateHandler2;
import org.apache.solr.util.RefCounted;
import org.apache.solr.util.plugin.SolrCoreAware;
import org.apache.commons.io.IOUtils;
@ -852,12 +851,9 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw
***/
}
}
if (core.getUpdateHandler() instanceof DirectUpdateHandler2) {
((DirectUpdateHandler2) core.getUpdateHandler()).forceOpenWriter();
} else {
LOG.warn("The update handler being used is not an instance or sub-class of DirectUpdateHandler2. " +
"Replicate on Startup cannot work.");
}
// reboot the writer on the new index
core.getUpdateHandler().newIndexWriter();
} catch (IOException e) {
LOG.warn("Unable to get IndexCommit on startup", e);
@ -969,6 +965,11 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw
}
public void newSearcher(SolrIndexSearcher newSearcher, SolrIndexSearcher currentSearcher) { /*no op*/}
@Override
public void postSoftCommit() {
}
};
}

View File

@ -66,8 +66,8 @@ public class RequestHandlerUtils
if( optimize || commit || force ) {
CommitUpdateCommand cmd = new CommitUpdateCommand(req, optimize );
cmd.waitFlush = params.getBool( UpdateParams.WAIT_FLUSH, cmd.waitFlush );
cmd.waitSearcher = params.getBool( UpdateParams.WAIT_SEARCHER, cmd.waitSearcher );
cmd.softCommit = params.getBool( UpdateParams.SOFT_COMMIT, cmd.softCommit );
cmd.expungeDeletes = params.getBool( UpdateParams.EXPUNGE_DELETES, cmd.expungeDeletes);
cmd.maxOptimizeSegments = params.getInt(UpdateParams.MAX_OPTIMIZE_SEGMENTS, cmd.maxOptimizeSegments);
req.getCore().getUpdateHandler().commit( cmd );
@ -100,8 +100,8 @@ public class RequestHandlerUtils
if( optimize || commit || force ) {
CommitUpdateCommand cmd = new CommitUpdateCommand(req, optimize );
cmd.waitFlush = params.getBool( UpdateParams.WAIT_FLUSH, cmd.waitFlush );
cmd.waitSearcher = params.getBool( UpdateParams.WAIT_SEARCHER, cmd.waitSearcher );
cmd.softCommit = params.getBool( UpdateParams.SOFT_COMMIT, cmd.softCommit );
cmd.expungeDeletes = params.getBool( UpdateParams.EXPUNGE_DELETES, cmd.expungeDeletes);
cmd.maxOptimizeSegments = params.getInt(UpdateParams.MAX_OPTIMIZE_SEGMENTS, cmd.maxOptimizeSegments);
processor.processCommit( cmd );

View File

@ -24,17 +24,15 @@ import org.apache.lucene.index.IndexCommit;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.util.FastInputStream;
import org.apache.solr.common.util.FileUtils;
import org.apache.solr.common.util.JavaBinCodec;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.FileUtils;
import org.apache.solr.core.SolrCore;
import static org.apache.solr.handler.ReplicationHandler.*;
import org.apache.solr.request.LocalSolrQueryRequest;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.search.SolrIndexSearcher;
import org.apache.solr.update.CommitUpdateCommand;
import org.apache.solr.update.DirectUpdateHandler2;
import org.apache.solr.util.RefCounted;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -63,9 +61,6 @@ import java.util.zip.InflaterInputStream;
public class SnapPuller {
private static final Logger LOG = LoggerFactory.getLogger(SnapPuller.class.getName());
private static final List<Map<String,Object>> EMPTY_LIST_OF_MAPS
= Collections.emptyList();
private final String masterUrl;
private final ReplicationHandler replicationHandler;
@ -475,21 +470,15 @@ public class SnapPuller {
}
private void doCommit() throws IOException {
SolrQueryRequest req = new LocalSolrQueryRequest(solrCore, new ModifiableSolrParams());
SolrQueryRequest req = new LocalSolrQueryRequest(solrCore,
new ModifiableSolrParams());
try {
CommitUpdateCommand cmd = new CommitUpdateCommand(req, false);
cmd.waitFlush = true;
cmd.waitSearcher = true;
solrCore.getUpdateHandler().commit(cmd);
if (solrCore.getUpdateHandler() instanceof DirectUpdateHandler2) {
LOG.info("Force open index writer to make sure older index files get deleted");
DirectUpdateHandler2 handler = (DirectUpdateHandler2) solrCore.getUpdateHandler();
handler.forceOpenWriter();
// reboot the writer on the new index and get a new searcher
solrCore.getUpdateHandler().newIndexWriter();
solrCore.getSearcher(true, false, null);
replicationHandler.refreshCommitpoint();
} else {
LOG.warn("The update handler is not an instance or sub-class of DirectUpdateHandler2. " +
"ReplicationHandler may not be able to cleanup un-used index files.");
}
} finally {
req.close();
}
@ -605,6 +594,7 @@ public class SnapPuller {
+ " to: " + indexFileInIndex , e);
}
}
if (!success) {
for (String f : copiedfiles) {
File indexFile = new File(indexDir, f);
@ -715,11 +705,10 @@ public class SnapPuller {
*/
private Collection<Map<String, Object>> getModifiedConfFiles(List<Map<String, Object>> confFilesToDownload) {
if (confFilesToDownload == null || confFilesToDownload.isEmpty())
return EMPTY_LIST_OF_MAPS;
return Collections.EMPTY_LIST;
//build a map with alias/name as the key
Map<String, Map<String, Object>> nameVsFile = new HashMap<String, Map<String, Object>>();
NamedList<String> names = new NamedList<String>();
NamedList names = new NamedList();
for (Map<String, Object> map : confFilesToDownload) {
//if alias is present that is the name the file may have in the slave
String name = (String) (map.get(ALIAS) == null ? map.get(NAME) : map.get(ALIAS));
@ -737,7 +726,7 @@ public class SnapPuller {
nameVsFile.remove(name); //checksums are same so the file need not be downloaded
}
}
return nameVsFile.isEmpty() ? EMPTY_LIST_OF_MAPS : nameVsFile.values();
return nameVsFile.isEmpty() ? Collections.EMPTY_LIST : nameVsFile.values();
}
/**
@ -800,25 +789,25 @@ public class SnapPuller {
//make a copy first because it can be null later
List<Map<String, Object>> tmp = confFilesToDownload;
//create a new instance. or else iterator may fail
return tmp == null ? EMPTY_LIST_OF_MAPS : new ArrayList<Map<String, Object>>(tmp);
return tmp == null ? Collections.EMPTY_LIST : new ArrayList<Map<String, Object>>(tmp);
}
List<Map<String, Object>> getConfFilesDownloaded() {
//make a copy first because it can be null later
List<Map<String, Object>> tmp = confFilesDownloaded;
// NOTE: it's safe to make a copy of a SynchronizedCollection(ArrayList)
return tmp == null ? EMPTY_LIST_OF_MAPS : new ArrayList<Map<String, Object>>(tmp);
return tmp == null ? Collections.EMPTY_LIST : new ArrayList<Map<String, Object>>(tmp);
}
List<Map<String, Object>> getFilesToDownload() {
//make a copy first because it can be null later
List<Map<String, Object>> tmp = filesToDownload;
return tmp == null ? EMPTY_LIST_OF_MAPS : new ArrayList<Map<String, Object>>(tmp);
return tmp == null ? Collections.EMPTY_LIST : new ArrayList<Map<String, Object>>(tmp);
}
List<Map<String, Object>> getFilesDownloaded() {
List<Map<String, Object>> tmp = filesDownloaded;
return tmp == null ? EMPTY_LIST_OF_MAPS : new ArrayList<Map<String, Object>>(tmp);
return tmp == null ? Collections.EMPTY_LIST : new ArrayList<Map<String, Object>>(tmp);
}
Map<String, Object> getCurrentFile() {

View File

@ -130,16 +130,13 @@ class XMLLoader extends ContentStreamLoader {
CommitUpdateCommand cmd = new CommitUpdateCommand(req, XmlUpdateRequestHandler.OPTIMIZE.equals(currTag));
boolean sawWaitSearcher = false, sawWaitFlush = false;
for (int i = 0; i < parser.getAttributeCount(); i++) {
String attrName = parser.getAttributeLocalName(i);
String attrVal = parser.getAttributeValue(i);
if (XmlUpdateRequestHandler.WAIT_FLUSH.equals(attrName)) {
cmd.waitFlush = StrUtils.parseBoolean(attrVal);
sawWaitFlush = true;
} else if (XmlUpdateRequestHandler.WAIT_SEARCHER.equals(attrName)) {
if (XmlUpdateRequestHandler.WAIT_SEARCHER.equals(attrName)) {
cmd.waitSearcher = StrUtils.parseBoolean(attrVal);
sawWaitSearcher = true;
} else if (XmlUpdateRequestHandler.SOFT_COMMIT.equals(attrName)) {
cmd.softCommit = StrUtils.parseBoolean(attrVal);
} else if (UpdateParams.MAX_OPTIMIZE_SEGMENTS.equals(attrName)) {
cmd.maxOptimizeSegments = Integer.parseInt(attrVal);
} else if (UpdateParams.EXPUNGE_DELETES.equals(attrName)) {
@ -149,11 +146,6 @@ class XMLLoader extends ContentStreamLoader {
}
}
// If waitFlush is specified and waitSearcher wasn't, then
// clear waitSearcher.
if (sawWaitFlush && !sawWaitSearcher) {
cmd.waitSearcher = false;
}
processor.processCommit(cmd);
} // end commit
else if (XmlUpdateRequestHandler.ROLLBACK.equals(currTag)) {

View File

@ -17,25 +17,15 @@
package org.apache.solr.handler;
import org.apache.solr.common.SolrException;
import javax.xml.stream.XMLInputFactory;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.XML;
import org.apache.solr.common.util.XMLErrorLogger;
import org.apache.solr.core.SolrCore;
import org.apache.solr.request.LocalSolrQueryRequest;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.response.SolrQueryResponse;
import org.apache.solr.update.processor.UpdateRequestProcessor;
import org.apache.solr.update.processor.UpdateRequestProcessorChain;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.xml.stream.XMLInputFactory;
import javax.xml.stream.XMLStreamReader;
import java.io.Reader;
import java.io.Writer;
import java.util.HashMap;
/**
* Add documents to solr using the STAX XML parser.
*/
@ -52,7 +42,7 @@ public class XmlUpdateRequestHandler extends ContentStreamHandlerBase {
public static final String COMMIT = "commit";
public static final String ROLLBACK = "rollback";
public static final String WAIT_SEARCHER = "waitSearcher";
public static final String WAIT_FLUSH = "waitFlush";
public static final String SOFT_COMMIT = "softCommit";
public static final String OVERWRITE = "overwrite";
public static final String COMMIT_WITHIN = "commitWithin";

View File

@ -702,6 +702,10 @@ public class SpellCheckComponent extends SearchComponent implements SolrCoreAwar
public void postCommit() {
}
@Override
public void postSoftCommit() {
}
}
public Map<String, SolrSpellChecker> getSpellCheckers() {

View File

@ -0,0 +1,194 @@
package org.apache.solr.update;
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.core.SolrCore;
import org.apache.solr.request.LocalSolrQueryRequest;
import org.apache.solr.request.SolrQueryRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Helper class for tracking autoCommit state.
*
* Note: This is purely an implementation detail of autoCommit and will
* definitely change in the future, so the interface should not be relied-upon
*
* Note: all access must be synchronized.
*/
final class CommitTracker implements Runnable {
protected final static Logger log = LoggerFactory.getLogger(CommitTracker.class);
// scheduler delay for maxDoc-triggered autocommits
public final int DOC_COMMIT_DELAY_MS = 250;
// settings, not final so we can change them in testing
int docsUpperBound;
long timeUpperBound;
private final ScheduledExecutorService scheduler = Executors
.newScheduledThreadPool(1);
private ScheduledFuture pending;
// state
long docsSinceCommit;
int autoCommitCount = 0;
long lastAddedTime = -1;
private SolrCore core;
private boolean softCommit;
private boolean waitSearcher;
public CommitTracker(SolrCore core, int docsUpperBound, int timeUpperBound, boolean waitSearcher, boolean softCommit) {
this.core = core;
docsSinceCommit = 0;
pending = null;
this.docsUpperBound = docsUpperBound;
this.timeUpperBound = timeUpperBound;
this.softCommit = softCommit;
this.waitSearcher = waitSearcher;
SolrCore.log.info("AutoCommit: " + this);
}
public void close() {
if (pending != null) {
pending.cancel(true);
pending = null;
}
scheduler.shutdown();
}
/** schedule individual commits */
public synchronized void scheduleCommitWithin(long commitMaxTime) {
_scheduleCommitWithin(commitMaxTime);
}
private void _scheduleCommitWithin(long commitMaxTime) {
// Check if there is a commit already scheduled for longer then this time
if (pending != null
&& pending.getDelay(TimeUnit.MILLISECONDS) >= commitMaxTime) {
pending.cancel(false);
pending = null;
}
// schedule a new commit
if (pending == null) {
pending = scheduler.schedule(this, commitMaxTime, TimeUnit.MILLISECONDS);
}
}
/**
* Indicate that documents have been added
*/
public boolean addedDocument(int commitWithin) {
docsSinceCommit++;
lastAddedTime = System.currentTimeMillis();
boolean triggered = false;
// maxDocs-triggered autoCommit
if (docsUpperBound > 0 && (docsSinceCommit > docsUpperBound)) {
_scheduleCommitWithin(DOC_COMMIT_DELAY_MS);
triggered = true;
}
// maxTime-triggered autoCommit
long ctime = (commitWithin > 0) ? commitWithin : timeUpperBound;
if (ctime > 0) {
_scheduleCommitWithin(ctime);
triggered = true;
}
return triggered;
}
/** Inform tracker that a commit has occurred, cancel any pending commits */
public void didCommit() {
if (pending != null) {
pending.cancel(false);
pending = null; // let it start another one
}
docsSinceCommit = 0;
}
/** Inform tracker that a rollback has occurred, cancel any pending commits */
public void didRollback() {
if (pending != null) {
pending.cancel(false);
pending = null; // let it start another one
}
docsSinceCommit = 0;
}
/** This is the worker part for the ScheduledFuture **/
public synchronized void run() {
long started = System.currentTimeMillis();
SolrQueryRequest req = new LocalSolrQueryRequest(core,
new ModifiableSolrParams());
try {
CommitUpdateCommand command = new CommitUpdateCommand(req, false);
command.waitSearcher = waitSearcher;
command.softCommit = softCommit;
// no need for command.maxOptimizeSegments = 1; since it is not optimizing
core.getUpdateHandler().commit(command);
autoCommitCount++;
} catch (Exception e) {
log.error("auto commit error...");
e.printStackTrace();
} finally {
pending = null;
req.close();
}
// check if docs have been submitted since the commit started
if (lastAddedTime > started) {
if (docsUpperBound > 0 && docsSinceCommit > docsUpperBound) {
pending = scheduler.schedule(this, 100, TimeUnit.MILLISECONDS);
} else if (timeUpperBound > 0) {
pending = scheduler.schedule(this, timeUpperBound,
TimeUnit.MILLISECONDS);
}
}
}
// to facilitate testing: blocks if called during commit
public synchronized int getCommitCount() {
return autoCommitCount;
}
@Override
public String toString() {
if (timeUpperBound > 0 || docsUpperBound > 0) {
return (timeUpperBound > 0 ? ("if uncommited for " + timeUpperBound + "ms; ")
: "")
+ (docsUpperBound > 0 ? ("if " + docsUpperBound + " uncommited docs ")
: "");
} else {
return "disabled";
}
}
}

View File

@ -24,9 +24,9 @@ import org.apache.solr.request.SolrQueryRequest;
*/
public class CommitUpdateCommand extends UpdateCommand {
public boolean optimize;
public boolean waitFlush;
public boolean waitSearcher=true;
public boolean expungeDeletes = false;
public boolean softCommit = false;
/**
* During optimize, optimize down to <= this many segments. Must be >= 1
@ -42,9 +42,9 @@ public class CommitUpdateCommand extends UpdateCommand {
@Override
public String toString() {
return "commit(optimize="+optimize
+",waitFlush="+waitFlush
+",waitSearcher="+waitSearcher
+",expungeDeletes="+expungeDeletes
+",softCommit="+softCommit
+')';
}
}

View File

@ -0,0 +1,80 @@
package org.apache.solr.update;
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.io.IOException;
import org.apache.lucene.index.IndexWriter;
import org.apache.solr.core.SolrCore;
public final class DefaultIndexWriterProvider implements IndexWriterProvider {
private int refCnt = 1;
private IndexWriter indexWriter = null;
private SolrCore core;
public DefaultIndexWriterProvider(SolrCore core) {
this.core = core;
}
@Override
public synchronized IndexWriter getIndexWriter() throws IOException {
if (indexWriter == null) {
indexWriter = createMainIndexWriter("DirectUpdateHandler2", false);
}
return indexWriter;
}
@Override
public synchronized void newIndexWriter() throws IOException {
if (indexWriter != null) {
indexWriter.close();
}
indexWriter = createMainIndexWriter("DirectUpdateHandler2",
false);
}
@Override
public synchronized void decref() throws IOException {
refCnt--;
if (refCnt == 0 && indexWriter != null) {
indexWriter.close();
}
}
@Override
public synchronized void incref() {
if (refCnt == 0) {
throw new IllegalStateException("IndexWriter has been closed");
}
refCnt++;
}
@Override
public synchronized void rollbackIndexWriter() throws IOException {
indexWriter.rollback();
newIndexWriter();
}
protected SolrIndexWriter createMainIndexWriter(String name,
boolean removeAllExisting) throws IOException {
return new SolrIndexWriter(name, core.getNewIndexDir(),
core.getDirectoryFactory(), removeAllExisting, core.getSchema(),
core.getSolrConfig().mainIndexConfig, core.getDeletionPolicy(), core.getCodecProvider());
}
}

View File

@ -32,35 +32,29 @@ import org.apache.lucene.search.TermQuery;
import org.apache.lucene.search.BooleanClause.Occur;
import org.apache.lucene.store.Directory;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.atomic.AtomicLong;
import java.io.IOException;
import java.net.URL;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.request.LocalSolrQueryRequest;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.search.QParser;
import org.apache.solr.search.QueryParsing;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.SimpleOrderedMap;
import org.apache.solr.core.SolrCore;
import org.apache.solr.core.SolrConfig.UpdateHandlerInfo;
import org.apache.solr.search.SolrIndexSearcher;
/**
* TODO: add soft commitWithin support
*
* <code>DirectUpdateHandler2</code> implements an UpdateHandler where documents are added
* directly to the main Lucene index as opposed to adding to a separate smaller index.
*/
public class DirectUpdateHandler2 extends UpdateHandler {
protected IndexWriterProvider indexWriterProvider;
// stats
AtomicLong addCommands = new AtomicLong();
@ -79,66 +73,61 @@ public class DirectUpdateHandler2 extends UpdateHandler {
AtomicLong numErrorsCumulative = new AtomicLong();
// tracks when auto-commit should occur
protected final CommitTracker tracker;
// iwCommit protects internal data and open/close of the IndexWriter and
// is a mutex. Any use of the index writer should be protected by iwAccess,
// which admits multiple simultaneous acquisitions. iwAccess is
// mutually-exclusive with the iwCommit lock.
protected final Lock iwAccess, iwCommit;
protected IndexWriter writer;
protected final CommitTracker commitTracker;
protected final CommitTracker softCommitTracker;
public DirectUpdateHandler2(SolrCore core) throws IOException {
super(core);
// Pass fairness=true so commit request is not starved
// when add/updates are running hot (SOLR-2342):
ReadWriteLock rwl = new ReentrantReadWriteLock(true);
iwAccess = rwl.readLock();
iwCommit = rwl.writeLock();
indexWriterProvider = new DefaultIndexWriterProvider(core);
UpdateHandlerInfo updateHandlerInfo = core.getSolrConfig()
.getUpdateHandlerInfo();
int docsUpperBound = updateHandlerInfo.autoCommmitMaxDocs; // getInt("updateHandler/autoCommit/maxDocs", -1);
int timeUpperBound = updateHandlerInfo.autoCommmitMaxTime; // getInt("updateHandler/autoCommit/maxTime", -1);
commitTracker = new CommitTracker(core, docsUpperBound, timeUpperBound, true, false);
int softCommitDocsUpperBound = updateHandlerInfo.autoSoftCommmitMaxDocs; // getInt("updateHandler/autoSoftCommit/maxDocs", -1);
int softCommitTimeUpperBound = updateHandlerInfo.autoSoftCommmitMaxTime; // getInt("updateHandler/autoSoftCommit/maxTime", -1);
softCommitTracker = new CommitTracker(core, softCommitDocsUpperBound, softCommitTimeUpperBound, true, true);
}
public DirectUpdateHandler2(SolrCore core, UpdateHandler updateHandler) throws IOException {
super(core);
if (updateHandler instanceof DirectUpdateHandler2) {
this.indexWriterProvider = ((DirectUpdateHandler2)updateHandler).indexWriterProvider;
} else {
// the impl has changed, so we cannot use the old state - decref it
updateHandler.decref();
indexWriterProvider = new DefaultIndexWriterProvider(core);
}
UpdateHandlerInfo updateHandlerInfo = core.getSolrConfig()
.getUpdateHandlerInfo();
int docsUpperBound = updateHandlerInfo.autoCommmitMaxDocs; // getInt("updateHandler/autoCommit/maxDocs", -1);
int timeUpperBound = updateHandlerInfo.autoCommmitMaxTime; // getInt("updateHandler/autoCommit/maxTime", -1);
commitTracker = new CommitTracker(core, docsUpperBound, timeUpperBound, true, false);
int softCommitDocsUpperBound = updateHandlerInfo.autoSoftCommmitMaxDocs; // getInt("updateHandler/autoSoftCommit/maxDocs", -1);
int softCommitTimeUpperBound = updateHandlerInfo.autoSoftCommmitMaxTime; // getInt("updateHandler/autoSoftCommit/maxTime", -1);
softCommitTracker = new CommitTracker(core, softCommitDocsUpperBound, softCommitTimeUpperBound, true, true);
tracker = new CommitTracker();
}
// must only be called when iwCommit lock held
private void deleteAll() throws IOException {
core.log.info(core.getLogId()+"REMOVING ALL DOCUMENTS FROM INDEX");
closeWriter();
writer = createMainIndexWriter("DirectUpdateHandler2", true);
SolrCore.log.info(core.getLogId()+"REMOVING ALL DOCUMENTS FROM INDEX");
indexWriterProvider.getIndexWriter().deleteAll();
}
// must only be called when iwCommit lock held
protected void openWriter() throws IOException {
if (writer==null) {
writer = createMainIndexWriter("DirectUpdateHandler2", false);
}
}
// must only be called when iwCommit lock held
protected void closeWriter() throws IOException {
try {
numDocsPending.set(0);
if (writer!=null) writer.close();
} finally {
// if an exception causes the writelock to not be
// released, we could try and delete it here
writer=null;
}
}
// must only be called when iwCommit lock held
protected void rollbackWriter() throws IOException {
try {
numDocsPending.set(0);
if (writer!=null) writer.rollback();
} finally {
writer = null;
}
indexWriterProvider.rollbackIndexWriter();
}
@Override
public int addDoc(AddUpdateCommand cmd) throws IOException {
IndexWriter writer = indexWriterProvider.getIndexWriter();
addCommands.incrementAndGet();
addCommandsCumulative.incrementAndGet();
int rc=-1;
@ -148,18 +137,17 @@ public class DirectUpdateHandler2 extends UpdateHandler {
cmd.overwrite = false;
}
iwAccess.lock();
try {
// We can't use iwCommit to protect internal data here, since it would
// block other addDoc calls. Hence, we synchronize to protect internal
// state. This is safe as all other state-changing operations are
// protected with iwCommit (which iwAccess excludes from this block).
synchronized (this) {
// adding document -- prep writer
openWriter();
tracker.addedDocument( cmd.commitWithin );
} // end synchronized block
try {
boolean triggered = commitTracker.addedDocument( cmd.commitWithin );
if (!triggered) {
// if we hard commit, don't soft commit
softCommitTracker.addedDocument( cmd.commitWithin );
} else {
// still inc softCommit
softCommitTracker.docsSinceCommit++;
}
// this is the only unsynchronized code in the iwAccess block, which
// should account for most of the time
@ -192,7 +180,6 @@ public class DirectUpdateHandler2 extends UpdateHandler {
rc = 1;
} finally {
iwAccess.unlock();
if (rc!=1) {
numErrors.incrementAndGet();
numErrorsCumulative.incrementAndGet();
@ -211,16 +198,12 @@ public class DirectUpdateHandler2 extends UpdateHandler {
deleteByIdCommands.incrementAndGet();
deleteByIdCommandsCumulative.incrementAndGet();
iwCommit.lock();
try {
openWriter();
writer.deleteDocuments(new Term(idField.getName(), idFieldType.toInternal(cmd.id)));
} finally {
iwCommit.unlock();
}
indexWriterProvider.getIndexWriter().deleteDocuments(new Term(idField.getName(), idFieldType.toInternal(cmd.id)));
if( tracker.timeUpperBound > 0 ) {
tracker.scheduleCommitWithin( tracker.timeUpperBound );
if (commitTracker.timeUpperBound > 0) {
commitTracker.scheduleCommitWithin(commitTracker.timeUpperBound);
} else if (softCommitTracker.timeUpperBound > 0) {
softCommitTracker.scheduleCommitWithin(softCommitTracker.timeUpperBound);
}
}
@ -230,7 +213,6 @@ public class DirectUpdateHandler2 extends UpdateHandler {
public void deleteByQuery(DeleteUpdateCommand cmd) throws IOException {
deleteByQueryCommands.incrementAndGet();
deleteByQueryCommandsCumulative.incrementAndGet();
boolean madeIt=false;
boolean delAll=false;
try {
@ -244,23 +226,20 @@ public class DirectUpdateHandler2 extends UpdateHandler {
delAll = MatchAllDocsQuery.class == q.getClass();
iwCommit.lock();
try {
if (delAll) {
deleteAll();
} else {
openWriter();
writer.deleteDocuments(q);
}
} finally {
iwCommit.unlock();
indexWriterProvider.getIndexWriter().deleteDocuments(q);
}
madeIt=true;
madeIt = true;
if( tracker.timeUpperBound > 0 ) {
tracker.scheduleCommitWithin( tracker.timeUpperBound );
if (commitTracker.timeUpperBound > 0) {
commitTracker.scheduleCommitWithin(commitTracker.timeUpperBound);
} else if (softCommitTracker.timeUpperBound > 0) {
softCommitTracker.scheduleCommitWithin(softCommitTracker.timeUpperBound);
}
} finally {
if (!madeIt) {
numErrors.incrementAndGet();
@ -274,42 +253,30 @@ public class DirectUpdateHandler2 extends UpdateHandler {
mergeIndexesCommands.incrementAndGet();
int rc = -1;
iwCommit.lock();
try {
log.info("start " + cmd);
IndexReader[] readers = cmd.readers;
if (readers != null && readers.length > 0) {
openWriter();
writer.addIndexes(readers);
indexWriterProvider.getIndexWriter().addIndexes(readers);
rc = 1;
} else {
rc = 0;
}
log.info("end_mergeIndexes");
} finally {
iwCommit.unlock();
}
if (rc == 1 && tracker.timeUpperBound > 0) {
tracker.scheduleCommitWithin(tracker.timeUpperBound);
// TODO: consider soft commit issues
if (rc == 1 && commitTracker.timeUpperBound > 0) {
commitTracker.scheduleCommitWithin(commitTracker.timeUpperBound);
} else if (rc == 1 && softCommitTracker.timeUpperBound > 0) {
softCommitTracker.scheduleCommitWithin(softCommitTracker.timeUpperBound);
}
return rc;
}
public void forceOpenWriter() throws IOException {
iwCommit.lock();
try {
openWriter();
} finally {
iwCommit.unlock();
}
}
@Override
public void commit(CommitUpdateCommand cmd) throws IOException {
IndexWriter writer = indexWriterProvider.getIndexWriter();
if (cmd.optimize) {
optimizeCommands.incrementAndGet();
} else {
@ -323,38 +290,50 @@ public class DirectUpdateHandler2 extends UpdateHandler {
}
boolean error=true;
iwCommit.lock();
try {
log.info("start "+cmd);
if (cmd.optimize) {
openWriter();
writer.optimize(cmd.maxOptimizeSegments);
} else if (cmd.expungeDeletes) {
openWriter();
writer.expungeDeletes();
}
closeWriter();
if (!cmd.softCommit) {
writer.commit();
callPostCommitCallbacks();
} else {
callPostSoftCommitCallbacks();
}
if (cmd.optimize) {
callPostOptimizeCallbacks();
}
// open a new searcher in the sync block to avoid opening it
// after a deleteByQuery changed the index, or in between deletes
// and adds of another commit being done.
if (cmd.softCommit) {
core.getSearcher(true,false,waitSearcher, true);
} else {
core.getSearcher(true,false,waitSearcher);
}
// reset commit tracking
tracker.didCommit();
if (cmd.softCommit) {
softCommitTracker.didCommit();
} else {
commitTracker.didCommit();
}
log.info("end_commit_flush");
error=false;
}
finally {
iwCommit.unlock();
addCommands.set(0);
deleteByIdCommands.set(0);
deleteByQueryCommands.set(0);
@ -374,16 +353,36 @@ public class DirectUpdateHandler2 extends UpdateHandler {
}
}
@Override
public SolrIndexSearcher reopenSearcher(SolrIndexSearcher previousSearcher) throws IOException {
IndexReader currentReader = previousSearcher.getIndexReader();
IndexReader newReader;
newReader = currentReader.reopen(indexWriterProvider.getIndexWriter(), true);
if (newReader == currentReader) {
currentReader.incRef();
}
return new SolrIndexSearcher(core, schema, "main", newReader, true, true);
}
@Override
public void newIndexWriter() throws IOException {
indexWriterProvider.newIndexWriter();
}
/**
* @since Solr 1.4
*/
@Override
public void rollback(RollbackUpdateCommand cmd) throws IOException {
rollbackCommands.incrementAndGet();
boolean error=true;
iwCommit.lock();
try {
log.info("start "+cmd);
@ -392,14 +391,14 @@ public class DirectUpdateHandler2 extends UpdateHandler {
//callPostRollbackCallbacks();
// reset commit tracking
tracker.didRollback();
commitTracker.didRollback();
softCommitTracker.didRollback();
log.info("end_rollback");
error=false;
}
finally {
iwCommit.unlock();
addCommandsCumulative.set(
addCommandsCumulative.get() - addCommands.getAndSet( 0 ) );
deleteByIdCommandsCumulative.set(
@ -414,162 +413,16 @@ public class DirectUpdateHandler2 extends UpdateHandler {
@Override
public void close() throws IOException {
log.info("closing " + this);
iwCommit.lock();
try{
// cancel any pending operations
if( tracker.pending != null ) {
tracker.pending.cancel( true );
tracker.pending = null;
}
tracker.scheduler.shutdown();
closeWriter();
} finally {
iwCommit.unlock();
}
commitTracker.close();
softCommitTracker.close();
numDocsPending.set(0);
indexWriterProvider.decref();
log.info("closed " + this);
}
/** Helper class for tracking autoCommit state.
*
* Note: This is purely an implementation detail of autoCommit and will
* definitely change in the future, so the interface should not be
* relied-upon
*
* Note: all access must be synchronized.
*/
class CommitTracker implements Runnable
{
// scheduler delay for maxDoc-triggered autocommits
public final int DOC_COMMIT_DELAY_MS = 250;
// settings, not final so we can change them in testing
int docsUpperBound;
long timeUpperBound;
private final ScheduledExecutorService scheduler =
Executors.newScheduledThreadPool(1);
private ScheduledFuture pending;
// state
long docsSinceCommit;
int autoCommitCount = 0;
long lastAddedTime = -1;
public CommitTracker() {
docsSinceCommit = 0;
pending = null;
docsUpperBound = core.getSolrConfig().getUpdateHandlerInfo().autoCommmitMaxDocs; //getInt("updateHandler/autoCommit/maxDocs", -1);
timeUpperBound = core.getSolrConfig().getUpdateHandlerInfo().autoCommmitMaxTime; //getInt("updateHandler/autoCommit/maxTime", -1);
SolrCore.log.info("AutoCommit: " + this);
}
/** schedule individual commits */
public synchronized void scheduleCommitWithin(long commitMaxTime)
{
_scheduleCommitWithin( commitMaxTime );
}
private void _scheduleCommitWithin(long commitMaxTime)
{
// Check if there is a commit already scheduled for longer then this time
if( pending != null &&
pending.getDelay(TimeUnit.MILLISECONDS) >= commitMaxTime )
{
pending.cancel(false);
pending = null;
}
// schedule a new commit
if( pending == null ) {
pending = scheduler.schedule( this, commitMaxTime, TimeUnit.MILLISECONDS );
}
}
/** Indicate that documents have been added
*/
public void addedDocument( int commitWithin ) {
docsSinceCommit++;
lastAddedTime = System.currentTimeMillis();
// maxDocs-triggered autoCommit
if( docsUpperBound > 0 && (docsSinceCommit > docsUpperBound) ) {
_scheduleCommitWithin( DOC_COMMIT_DELAY_MS );
}
// maxTime-triggered autoCommit
long ctime = (commitWithin>0) ? commitWithin : timeUpperBound;
if( ctime > 0 ) {
_scheduleCommitWithin( ctime );
}
}
/** Inform tracker that a commit has occurred, cancel any pending commits */
public void didCommit() {
if( pending != null ) {
pending.cancel(false);
pending = null; // let it start another one
}
docsSinceCommit = 0;
}
/** Inform tracker that a rollback has occurred, cancel any pending commits */
public void didRollback() {
if( pending != null ) {
pending.cancel(false);
pending = null; // let it start another one
}
docsSinceCommit = 0;
}
/** This is the worker part for the ScheduledFuture **/
public synchronized void run() {
long started = System.currentTimeMillis();
SolrQueryRequest req = new LocalSolrQueryRequest(core, new ModifiableSolrParams());
try {
CommitUpdateCommand command = new CommitUpdateCommand(req, false );
command.waitFlush = true;
command.waitSearcher = true;
//no need for command.maxOptimizeSegments = 1; since it is not optimizing
commit( command );
autoCommitCount++;
}
catch (Exception e) {
log.error( "auto commit error..." );
e.printStackTrace();
}
finally {
pending = null;
req.close();
}
// check if docs have been submitted since the commit started
if( lastAddedTime > started ) {
if( docsUpperBound > 0 && docsSinceCommit > docsUpperBound ) {
pending = scheduler.schedule( this, 100, TimeUnit.MILLISECONDS );
}
else if( timeUpperBound > 0 ) {
pending = scheduler.schedule( this, timeUpperBound, TimeUnit.MILLISECONDS );
}
}
}
// to facilitate testing: blocks if called during commit
public synchronized int getCommitCount() { return autoCommitCount; }
@Override
public String toString() {
if(timeUpperBound > 0 || docsUpperBound > 0) {
return
(timeUpperBound > 0 ? ("if uncommited for " + timeUpperBound + "ms; ") : "") +
(docsUpperBound > 0 ? ("if " + docsUpperBound + " uncommited docs ") : "");
} else {
return "disabled";
}
}
}
/////////////////////////////////////////////////////////////////////
// SolrInfoMBean stuff: Statistics and Module Info
@ -606,13 +459,20 @@ public class DirectUpdateHandler2 extends UpdateHandler {
public NamedList getStatistics() {
NamedList lst = new SimpleOrderedMap();
lst.add("commits", commitCommands.get());
if (tracker.docsUpperBound > 0) {
lst.add("autocommit maxDocs", tracker.docsUpperBound);
if (commitTracker.docsUpperBound > 0) {
lst.add("autocommit maxDocs", commitTracker.docsUpperBound);
}
if (tracker.timeUpperBound > 0) {
lst.add("autocommit maxTime", "" + tracker.timeUpperBound + "ms");
if (commitTracker.timeUpperBound > 0) {
lst.add("autocommit maxTime", "" + commitTracker.timeUpperBound + "ms");
}
lst.add("autocommits", tracker.autoCommitCount);
lst.add("autocommits", commitTracker.autoCommitCount);
if (softCommitTracker.docsUpperBound > 0) {
lst.add("soft autocommit maxDocs", softCommitTracker.docsUpperBound);
}
if (softCommitTracker.timeUpperBound > 0) {
lst.add("soft autocommit maxTime", "" + softCommitTracker.timeUpperBound + "ms");
}
lst.add("soft autocommits", softCommitTracker.autoCommitCount);
lst.add("optimizes", optimizeCommands.get());
lst.add("rollbacks", rollbackCommands.get());
lst.add("expungeDeletes", expungeDeleteCommands.get());
@ -634,4 +494,22 @@ public class DirectUpdateHandler2 extends UpdateHandler {
public String toString() {
return "DirectUpdateHandler2" + getStatistics();
}
public IndexWriterProvider getIndexWriterProvider() {
return indexWriterProvider;
}
@Override
public void decref() {
try {
indexWriterProvider.decref();
} catch (IOException e) {
throw new SolrException(ErrorCode.SERVER_ERROR, "", e, false);
}
}
@Override
public void incref() {
indexWriterProvider.incref();
}
}

View File

@ -0,0 +1,36 @@
package org.apache.solr.update;
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.io.IOException;
import org.apache.lucene.index.IndexWriter;
public interface IndexWriterProvider {
public void newIndexWriter() throws IOException;
public IndexWriter getIndexWriter() throws IOException;
public void decref() throws IOException;
public void incref();
public void rollbackIndexWriter() throws IOException;
}

View File

@ -35,6 +35,7 @@ import java.io.PrintStream;
import java.text.DateFormat;
import java.util.Date;
import java.util.Locale;
import java.util.concurrent.atomic.AtomicLong;
/**
* An IndexWriter that is configured via Solr config mechanisms.
@ -44,6 +45,9 @@ import java.util.Locale;
public class SolrIndexWriter extends IndexWriter {
private static Logger log = LoggerFactory.getLogger(SolrIndexWriter.class);
// These should *only* be used for debugging or monitoring purposes
public static final AtomicLong numOpens = new AtomicLong();
public static final AtomicLong numCloses = new AtomicLong();
String name;
private PrintStream infoStream;
@ -90,6 +94,7 @@ public class SolrIndexWriter extends IndexWriter {
this.name = name;
setInfoStream(config);
numOpens.incrementAndGet();
}
private void setInfoStream(SolrIndexConfig config)
@ -147,6 +152,7 @@ public class SolrIndexWriter extends IndexWriter {
}
} finally {
isClosed = true;
numCloses.incrementAndGet();
}
}
@ -163,6 +169,7 @@ public class SolrIndexWriter extends IndexWriter {
protected void finalize() throws Throwable {
try {
if(!isClosed){
assert false : "SolrIndexWriter was not closed prior to finalize()";
log.error("SolrIndexWriter was not closed prior to finalize(), indicates a bug -- POSSIBLE RESOURCE LEAK!!!");
close();
}

View File

@ -56,8 +56,19 @@ public abstract class UpdateHandler implements SolrInfoMBean {
protected final FieldType idFieldType;
protected Vector<SolrEventListener> commitCallbacks = new Vector<SolrEventListener>();
protected Vector<SolrEventListener> softCommitCallbacks = new Vector<SolrEventListener>();
protected Vector<SolrEventListener> optimizeCallbacks = new Vector<SolrEventListener>();
/**
* Called when a SolrCore using this UpdateHandler is closed.
*/
public abstract void decref();
/**
* Called when this UpdateHandler is shared with another SolrCore.
*/
public abstract void incref();
private void parseEventListeners() {
final Class<SolrEventListener> clazz = SolrEventListener.class;
final String label = "Event Listener";
@ -81,6 +92,12 @@ public abstract class UpdateHandler implements SolrInfoMBean {
}
}
protected void callPostSoftCommitCallbacks() {
for (SolrEventListener listener : softCommitCallbacks) {
listener.postSoftCommit();
}
}
protected void callPostOptimizeCallbacks() {
for (SolrEventListener listener : optimizeCallbacks) {
listener.postCommit();
@ -95,10 +112,6 @@ public abstract class UpdateHandler implements SolrInfoMBean {
parseEventListeners();
}
protected SolrIndexWriter createMainIndexWriter(String name, boolean removeAllExisting) throws IOException {
return new SolrIndexWriter(name,core.getNewIndexDir(), core.getDirectoryFactory(), removeAllExisting, schema, core.getSolrConfig().mainIndexConfig, core.getDeletionPolicy(), core.getCodecProvider());
}
protected final Term idTerm(String readableId) {
// to correctly create the Term, the string needs to be run
// through the Analyzer for that field.
@ -127,6 +140,23 @@ public abstract class UpdateHandler implements SolrInfoMBean {
return idFieldType.storedToIndexed(f);
}
/**
* Allows the UpdateHandler to create the SolrIndexSearcher after it
* has issued a 'softCommit'.
*
* @param previousSearcher
* @throws IOException
*/
public abstract SolrIndexSearcher reopenSearcher(SolrIndexSearcher previousSearcher) throws IOException;
/**
* Called when the Writer should be opened again - eg when replication replaces
* all of the index files.
*
* @throws IOException
*/
public abstract void newIndexWriter() throws IOException;
public abstract int addDoc(AddUpdateCommand cmd) throws IOException;
public abstract void delete(DeleteUpdateCommand cmd) throws IOException;
@ -187,6 +217,18 @@ public abstract class UpdateHandler implements SolrInfoMBean {
commitCallbacks.add( listener );
}
/**
* NOTE: this function is not thread safe. However, it is safe to call within the
* <code>inform( SolrCore core )</code> function for <code>SolrCoreAware</code> classes.
* Outside <code>inform</code>, this could potentially throw a ConcurrentModificationException
*
* @see SolrCoreAware
*/
public void registerSoftCommitCallback( SolrEventListener listener )
{
softCommitCallbacks.add( listener );
}
/**
* NOTE: this function is not thread safe. However, it is safe to call within the
* <code>inform( SolrCore core )</code> function for <code>SolrCoreAware</code> classes.

View File

@ -129,8 +129,7 @@ public class StreamingUpdateSolrServer extends CommonsHttpSolrServer
if( fmt != null ) {
log.info( fmt );
writer.write( String.format( fmt,
params.getBool( UpdateParams.WAIT_SEARCHER, false )+"",
params.getBool( UpdateParams.WAIT_FLUSH, false )+"") );
params.getBool( UpdateParams.WAIT_SEARCHER, false )+"") );
}
}

View File

@ -59,7 +59,6 @@ public abstract class AbstractUpdateRequest extends SolrRequest {
else if( action == ACTION.COMMIT ) {
params.set( UpdateParams.COMMIT, "true" );
}
params.set( UpdateParams.WAIT_FLUSH, String.valueOf(waitFlush));
params.set( UpdateParams.WAIT_SEARCHER, String.valueOf(waitSearcher));
return this;
}
@ -107,10 +106,6 @@ public abstract class AbstractUpdateRequest extends SolrRequest {
return res;
}
public boolean isWaitFlush() {
return params != null && params.getBool(UpdateParams.WAIT_FLUSH, false);
}
public boolean isWaitSearcher() {
return params != null && params.getBool(UpdateParams.WAIT_SEARCHER, false);
}
@ -122,10 +117,6 @@ public abstract class AbstractUpdateRequest extends SolrRequest {
return null;
}
public void setWaitFlush(boolean waitFlush) {
setParam( UpdateParams.WAIT_FLUSH, waitFlush+"" );
}
public void setWaitSearcher(boolean waitSearcher) {
setParam( UpdateParams.WAIT_SEARCHER, waitSearcher+"" );
}

View File

@ -155,7 +155,6 @@ public class UpdateRequest extends AbstractUpdateRequest {
else if( action == ACTION.COMMIT ) {
params.set( UpdateParams.COMMIT, "true" );
}
params.set( UpdateParams.WAIT_FLUSH, waitFlush+"" );
params.set( UpdateParams.WAIT_SEARCHER, waitSearcher+"" );
return this;
}

View File

@ -41,6 +41,7 @@ import org.apache.solr.schema.IndexSchema;
import org.apache.solr.schema.SchemaField;
import org.apache.solr.search.SolrIndexSearcher;
import org.apache.solr.servlet.DirectSolrConnection;
import org.apache.solr.update.SolrIndexWriter;
import org.apache.solr.util.TestHarness;
import org.junit.AfterClass;
import org.junit.BeforeClass;
@ -64,6 +65,7 @@ public abstract class SolrTestCaseJ4 extends LuceneTestCase {
@BeforeClass
public static void beforeClassSolrTestCase() throws Exception {
startTrackingSearchers();
startTrackingWriters();
ignoreException("ignore_exception");
}
@ -72,6 +74,7 @@ public abstract class SolrTestCaseJ4 extends LuceneTestCase {
deleteCore();
resetExceptionIgnores();
endTrackingSearchers();
endTrackingWriters();
}
// SOLR-2279: hack to shut these directories down
@ -132,6 +135,24 @@ public abstract class SolrTestCaseJ4 extends LuceneTestCase {
}
}
static long numWriterOpens;
static long numWriterCloses;
public static void startTrackingWriters() {
numOpens = SolrIndexWriter.numOpens.get();
numCloses = SolrIndexWriter.numCloses.get();
}
public static void endTrackingWriters() {
long endNumOpens = SolrIndexWriter.numOpens.get();
long endNumCloses = SolrIndexWriter.numCloses.get();
if (endNumOpens-numOpens != endNumCloses-numCloses) {
String msg = "ERROR: SolrIndexWriter opens=" + (endNumOpens-numWriterOpens) + " closes=" + (endNumCloses-numWriterCloses);
log.error(msg);
fail(msg);
}
}
/** Causes an exception matching the regex pattern to not be logged. */
public static void ignoreException(String pattern) {
if (SolrException.ignorePatterns == null)

View File

@ -97,11 +97,13 @@ public abstract class AbstractSolrTestCase extends LuceneTestCase {
@BeforeClass
public static void beforeClassAbstractSolrTestCase() throws Exception {
SolrTestCaseJ4.startTrackingSearchers();
SolrTestCaseJ4.startTrackingWriters();
}
@AfterClass
public static void afterClassAbstractSolrTestCase() throws Exception {
SolrTestCaseJ4.endTrackingSearchers();
SolrTestCaseJ4.endTrackingWriters();
}
/**

View File

@ -28,9 +28,9 @@ import javax.xml.parsers.DocumentBuilderFactory;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.Fieldable;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.LogMergePolicy;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.params.AppendedSolrParams;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.DefaultSolrParams;
@ -49,7 +49,7 @@ import org.apache.solr.schema.IndexSchema;
import org.apache.solr.schema.SchemaField;
import org.apache.solr.search.DocIterator;
import org.apache.solr.search.DocList;
import org.apache.solr.update.SolrIndexWriter;
import org.apache.solr.update.DirectUpdateHandler2;
import org.junit.BeforeClass;
@ -120,9 +120,8 @@ public class BasicFunctionalityTest extends SolrTestCaseJ4 {
// test merge factor picked up
SolrCore core = h.getCore();
SolrIndexWriter writer = new SolrIndexWriter("testWriter",core.getNewIndexDir(), core.getDirectoryFactory(), false, core.getSchema(), core.getSolrConfig().mainIndexConfig, core.getDeletionPolicy(), core.getCodecProvider());
assertEquals("Mergefactor was not picked up", ((LogMergePolicy) writer.getConfig().getMergePolicy()).getMergeFactor(), 8);
writer.close();
IndexWriter writer = ((DirectUpdateHandler2)core.getUpdateHandler()).getIndexWriterProvider().getIndexWriter();
assertEquals("Mergefactor was not picked up", ((LogMergePolicy)writer.getConfig().getMergePolicy()).getMergeFactor(), 8);
lrf.args.put(CommonParams.VERSION,"2.2");
assertQ("test query on empty index",

View File

@ -17,11 +17,11 @@ package org.apache.solr.cloud;
* limitations under the License.
*/
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.LogMergePolicy;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.core.SolrCore;
import org.apache.solr.update.SolrIndexWriter;
import org.apache.solr.update.DirectUpdateHandler2;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
@ -45,11 +45,10 @@ public class BasicZkTest extends AbstractZkTestCase {
// test merge factor picked up
SolrCore core = h.getCore();
SolrIndexWriter writer = new SolrIndexWriter("testWriter", core
.getNewIndexDir(), core.getDirectoryFactory(), false, core.getSchema(),
core.getSolrConfig().mainIndexConfig, core.getDeletionPolicy(), core.getCodecProvider());
IndexWriter writer = ((DirectUpdateHandler2)core.getUpdateHandler()).getIndexWriterProvider().getIndexWriter();
assertEquals("Mergefactor was not picked up", ((LogMergePolicy)writer.getConfig().getMergePolicy()).getMergeFactor(), 8);
writer.close();
lrf.args.put(CommonParams.VERSION, "2.2");
assertQ("test query on empty index", req("qlkciyopsbgzyvkylsjhchghjrdf"),

View File

@ -68,13 +68,12 @@ public class CloudStateUpdateTest extends SolrTestCaseJ4 {
@BeforeClass
public static void beforeClass() throws Exception {
initCore();
}
@Override
public void setUp() throws Exception {
super.setUp();
createTempDir();
System.setProperty("zkClientTimeout", "3000");
zkDir = dataDir.getAbsolutePath() + File.separator
@ -95,7 +94,7 @@ public class CloudStateUpdateTest extends SolrTestCaseJ4 {
dataDir3 = new File(dataDir + File.separator + "data3");
dataDir3.mkdirs();
dataDir4 = new File(dataDir + File.separator + "data3");
dataDir4 = new File(dataDir + File.separator + "data4");
dataDir4.mkdirs();
// set some system properties for use by tests
@ -103,31 +102,21 @@ public class CloudStateUpdateTest extends SolrTestCaseJ4 {
System.setProperty("solr.test.sys.prop2", "proptwo");
System.setProperty("hostPort", "1661");
CoreContainer.Initializer init1 = new CoreContainer.Initializer() {
{
this.dataDir = CloudStateUpdateTest.this.dataDir1.getAbsolutePath();
}
};
CoreContainer.Initializer init1 = new CoreContainer.Initializer();
System.setProperty("solr.data.dir", CloudStateUpdateTest.this.dataDir1.getAbsolutePath());
container1 = init1.initialize();
System.clearProperty("hostPort");
System.setProperty("hostPort", "1662");
init2 = new CoreContainer.Initializer() {
{
this.dataDir = CloudStateUpdateTest.this.dataDir2.getAbsolutePath();
}
};
init2 = new CoreContainer.Initializer();
System.setProperty("solr.data.dir", CloudStateUpdateTest.this.dataDir2.getAbsolutePath());
container2 = init2.initialize();
System.clearProperty("hostPort");
System.setProperty("hostPort", "1663");
CoreContainer.Initializer init3 = new CoreContainer.Initializer() {
{
this.dataDir = CloudStateUpdateTest.this.dataDir3.getAbsolutePath();
}
};
CoreContainer.Initializer init3 = new CoreContainer.Initializer();
System.setProperty("solr.data.dir", CloudStateUpdateTest.this.dataDir3.getAbsolutePath());
container3 = init3.initialize();
System.clearProperty("hostPort");
@ -153,6 +142,7 @@ public class CloudStateUpdateTest extends SolrTestCaseJ4 {
dcore.setDataDir(dataDir4.getAbsolutePath());
SolrCore core = container1.create(dcore);
container1.register(core, false);
ZkController zkController2 = container2.getZkController();
@ -226,7 +216,7 @@ public class CloudStateUpdateTest extends SolrTestCaseJ4 {
assertTrue(container1.getZkController().getCloudState().liveNodesContain(
container2.getZkController().getNodeName()));
core.close();
}
@Override

View File

@ -41,6 +41,11 @@ public class MockEventListener implements SolrEventListener {
/* NOOP */
}
@Override
public void postSoftCommit() {
/* NOOP */
}
public void newSearcher(SolrIndexSearcher newSearcher,
SolrIndexSearcher currentSearcher) {
/* NOOP */

View File

@ -116,22 +116,9 @@ public class TestConfig extends SolrTestCaseJ4 {
@Test
public void testTermIndexInterval() throws Exception {
class ExposeWriterHandler extends DirectUpdateHandler2 {
public ExposeWriterHandler() throws IOException {
super(h.getCore());
}
public IndexWriter getWriter() throws IOException {
forceOpenWriter();
return writer;
}
}
ExposeWriterHandler duh = new ExposeWriterHandler();
IndexWriter writer = duh.getWriter();
IndexWriter writer = ((DirectUpdateHandler2)h.getCore().getUpdateHandler()).getIndexWriterProvider().getIndexWriter();
int interval = writer.getConfig().getTermIndexInterval();
assertEquals(256, interval);
duh.close();
}
@Test

View File

@ -17,8 +17,6 @@ package org.apache.solr.core;
* limitations under the License.
*/
import java.io.IOException;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.LogDocMergePolicy;
import org.apache.lucene.index.SerialMergeScheduler;
@ -35,21 +33,9 @@ public class TestLegacyMergeSchedulerPolicyConfig extends SolrTestCaseJ4 {
@Test
public void testLegacy() throws Exception {
ExposeWriterHandler duh = new ExposeWriterHandler();
IndexWriter writer = duh.getWriter();
IndexWriter writer = ((DirectUpdateHandler2)h.getCore().getUpdateHandler()).getIndexWriterProvider().getIndexWriter();
assertTrue(writer.getConfig().getMergePolicy().getClass().getName().equals(LogDocMergePolicy.class.getName()));
assertTrue(writer.getConfig().getMergeScheduler().getClass().getName().equals(SerialMergeScheduler.class.getName()));
duh.close();
}
class ExposeWriterHandler extends DirectUpdateHandler2 {
public ExposeWriterHandler() throws IOException {
super(h.getCore());
}
public IndexWriter getWriter() throws IOException {
forceOpenWriter();
return writer;
}
}
}

View File

@ -36,30 +36,15 @@ public class TestPropInject extends AbstractSolrTestCase {
return "solrconfig-propinject.xml";
}
class ExposeWriterHandler extends DirectUpdateHandler2 {
public ExposeWriterHandler() throws IOException {
super(h.getCore());
}
public IndexWriter getWriter() throws IOException {
forceOpenWriter();
return writer;
}
}
public void testMergePolicy() throws Exception {
ExposeWriterHandler uh = new ExposeWriterHandler();
IndexWriter writer = uh.getWriter();
IndexWriter writer = ((DirectUpdateHandler2)h.getCore().getUpdateHandler()).getIndexWriterProvider().getIndexWriter();
LogByteSizeMergePolicy mp = (LogByteSizeMergePolicy)writer.getConfig().getMergePolicy();
assertEquals(64.0, mp.getMaxMergeMB(), 0);
uh.close();
}
public void testProps() throws Exception {
ExposeWriterHandler uh = new ExposeWriterHandler();
IndexWriter writer = uh.getWriter();
IndexWriter writer = ((DirectUpdateHandler2)h.getCore().getUpdateHandler()).getIndexWriterProvider().getIndexWriter();
ConcurrentMergeScheduler cms = (ConcurrentMergeScheduler)writer.getConfig().getMergeScheduler();
assertEquals(2, cms.getMaxThreadCount());
uh.close();
}
}

View File

@ -17,8 +17,6 @@ package org.apache.solr.core;
* limitations under the License.
*/
import java.io.IOException;
import org.apache.lucene.index.ConcurrentMergeScheduler;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.LogByteSizeMergePolicy;
@ -33,33 +31,18 @@ public class TestPropInjectDefaults extends SolrTestCaseJ4 {
initCore("solrconfig-propinject-indexdefault.xml", "schema.xml");
}
class ExposeWriterHandler extends DirectUpdateHandler2 {
public ExposeWriterHandler() throws IOException {
super(h.getCore());
}
public IndexWriter getWriter() throws IOException {
forceOpenWriter();
return writer;
}
}
@Test
public void testMergePolicyDefaults() throws Exception {
ExposeWriterHandler uh = new ExposeWriterHandler();
IndexWriter writer = uh.getWriter();
IndexWriter writer = ((DirectUpdateHandler2)h.getCore().getUpdateHandler()).getIndexWriterProvider().getIndexWriter();
LogByteSizeMergePolicy mp = (LogByteSizeMergePolicy)writer.getConfig().getMergePolicy();
assertEquals(32.0, mp.getMaxMergeMB(), 0);
uh.close();
}
@Test
public void testPropsDefaults() throws Exception {
ExposeWriterHandler uh = new ExposeWriterHandler();
IndexWriter writer = uh.getWriter();
IndexWriter writer = ((DirectUpdateHandler2)h.getCore().getUpdateHandler()).getIndexWriterProvider().getIndexWriter();
ConcurrentMergeScheduler cms = (ConcurrentMergeScheduler)writer.getConfig().getMergeScheduler();
assertEquals(4, cms.getMaxThreadCount());
uh.close();
}
}

View File

@ -22,7 +22,6 @@ import java.util.ArrayList;
import java.util.List;
import org.apache.noggit.JSONParser;
import org.apache.lucene.util.LuceneTestCase;
import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.SolrInputField;
@ -70,7 +69,7 @@ public class JsonLoaderTest extends SolrTestCaseJ4 {
"},\n" +
"\n" +
"'commit': {},\n" +
"'optimize': { 'waitFlush':false, 'waitSearcher':false },\n" +
"'optimize': { 'waitSearcher':false },\n" +
"\n" +
"'delete': { 'id':'ID' },\n" +
"'delete': { 'query':'QUERY' },\n" +
@ -109,12 +108,10 @@ public class JsonLoaderTest extends SolrTestCaseJ4 {
assertEquals( 2, p.commitCommands.size() );
CommitUpdateCommand commit = p.commitCommands.get( 0 );
assertFalse( commit.optimize );
assertTrue( commit.waitFlush );
assertTrue( commit.waitSearcher );
commit = p.commitCommands.get( 1 );
assertTrue( commit.optimize );
assertFalse( commit.waitFlush );
assertFalse( commit.waitSearcher );

View File

@ -25,52 +25,47 @@ import org.apache.solr.common.params.MapSolrParams;
import org.apache.solr.common.util.ContentStream;
import org.apache.solr.common.util.ContentStreamBase;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.core.*;
import org.apache.solr.search.*;
import org.apache.solr.core.SolrCore;
import org.apache.solr.core.SolrEventListener;
import org.apache.solr.handler.XmlUpdateRequestHandler;
import org.apache.solr.request.SolrQueryRequestBase;
import org.apache.solr.response.SolrQueryResponse;
import org.apache.solr.search.SolrIndexSearcher;
import org.apache.solr.util.AbstractSolrTestCase;
import org.apache.solr.util.RefCounted;
class CommitListener implements SolrEventListener {
public volatile boolean triggered = false;
public volatile SolrIndexSearcher currentSearcher;
public SolrCore core;
public CommitListener(SolrCore core) {
this.core = core;
}
class NewSearcherListener implements SolrEventListener {
private volatile boolean triggered = false;
@Override
public void init(NamedList args) {}
public void newSearcher(SolrIndexSearcher newSearcher, SolrIndexSearcher currentSearcher) {
this.currentSearcher = currentSearcher;
triggered = true;
}
public void postCommit() {
@Override
public void newSearcher(SolrIndexSearcher newSearcher,
SolrIndexSearcher currentSearcher) {
triggered = true;
}
@Override
public void postCommit() {}
@Override
public void postSoftCommit() {}
public void reset() {
triggered=false;
triggered = false;
}
public boolean waitForCommit(int timeout) {
//triggered = false;
for (int towait=timeout; towait > 0; towait -= 250) {
try {
boolean waitForNewSearcher(int timeout) {
long timeoutTime = System.currentTimeMillis() + timeout;
while (System.currentTimeMillis() < timeoutTime) {
if (triggered) {
RefCounted<SolrIndexSearcher> holder = core.getSearcher();
SolrIndexSearcher s = holder.get();
holder.decref();
// since there could be two commits in a row, don't test for a specific new searcher
// just test that the old one has been replaced.
if (s != currentSearcher) return true;
// it may be that a commit just happened, but the new searcher hasn't been registered yet.
return true;
}
Thread.sleep( 250 );
try {
Thread.sleep(250);
} catch (InterruptedException e) {}
}
return false;
}
@ -97,61 +92,60 @@ public class AutoCommitTest extends AbstractSolrTestCase {
return streams;
}
/* This test is causing too many failures on one of the build slaves.
Temporarily disabled. -Mike Klaas */
public void XXXtestMaxDocs() throws Exception {
public void testMaxDocs() throws Exception {
SolrCore core = h.getCore();
CommitListener trigger = new CommitListener(core);
NewSearcherListener trigger = new NewSearcherListener();
DirectUpdateHandler2 updater = (DirectUpdateHandler2)core.getUpdateHandler();
DirectUpdateHandler2.CommitTracker tracker = updater.tracker;
tracker.timeUpperBound = 100000;
DirectUpdateHandler2 updateHandler = (DirectUpdateHandler2)core.getUpdateHandler();
CommitTracker tracker = updateHandler.commitTracker;
tracker.timeUpperBound = -1;
tracker.docsUpperBound = 14;
// updater.commitCallbacks.add(trigger);
core.registerNewSearcherListener(trigger);
XmlUpdateRequestHandler handler = new XmlUpdateRequestHandler();
handler.init( null );
MapSolrParams params = new MapSolrParams( new HashMap<String, String>() );
// Add a single document
// Add documents
SolrQueryResponse rsp = new SolrQueryResponse();
SolrQueryRequestBase req = new SolrQueryRequestBase( core, params ) {};
for( int i=0; i<14; i++ ) {
req.setContentStreams( toContentStreams(
adoc("id", "A"+i, "subject", "info" ), null ) );
adoc("id", Integer.toString(i), "subject", "info" ), null ) );
handler.handleRequest( req, rsp );
}
// It should not be there right away
assertQ("shouldn't find any", req("id:A1") ,"//result[@numFound=0]" );
assertQ("shouldn't find any", req("id:1") ,"//result[@numFound=0]" );
assertEquals( 0, tracker.getCommitCount());
req.setContentStreams( toContentStreams(
adoc("id", "A14", "subject", "info" ), null ) );
adoc("id", "14", "subject", "info" ), null ) );
handler.handleRequest( req, rsp );
// Wait longer than the autocommit time
assertTrue(trigger.waitForCommit(20000));
assertTrue(trigger.waitForNewSearcher(10000));
req.setContentStreams( toContentStreams(
adoc("id", "A15", "subject", "info" ), null ) );
adoc("id", "15", "subject", "info" ), null ) );
handler.handleRequest( req, rsp );
// Now make sure we can find it
assertQ("should find one", req("id:A14") ,"//result[@numFound=1]" );
assertQ("should find one", req("id:14") ,"//result[@numFound=1]" );
assertEquals( 1, tracker.getCommitCount());
// But not the one added afterward
assertQ("should not find one", req("id:A15") ,"//result[@numFound=0]" );
assertQ("should not find one", req("id:15") ,"//result[@numFound=0]" );
assertEquals( 1, tracker.getCommitCount());
}
public void testMaxTime() throws Exception {
SolrCore core = h.getCore();
CommitListener trigger = new CommitListener(core);
NewSearcherListener trigger = new NewSearcherListener();
core.registerNewSearcherListener(trigger);
DirectUpdateHandler2 updater = (DirectUpdateHandler2) core.getUpdateHandler();
DirectUpdateHandler2.CommitTracker tracker = updater.tracker;
CommitTracker tracker = updater.commitTracker;
// too low of a number can cause a slow host to commit before the test code checks that it
// isn't there... causing a failure at "shouldn't find any"
tracker.timeUpperBound = 1000;
@ -175,7 +169,7 @@ public class AutoCommitTest extends AbstractSolrTestCase {
assertQ("shouldn't find any", req("id:529") ,"//result[@numFound=0]" );
// Wait longer than the autocommit time
assertTrue(trigger.waitForCommit(30000));
assertTrue(trigger.waitForNewSearcher(30000));
trigger.reset();
req.setContentStreams( toContentStreams(
adoc("id", "530", "field_t", "what's inside?", "subject", "info"), null ) );
@ -190,7 +184,7 @@ public class AutoCommitTest extends AbstractSolrTestCase {
assertU( delI("529") );
assertQ("deleted, but should still be there", req("id:529") ,"//result[@numFound=1]" );
// Wait longer than the autocommit time
assertTrue(trigger.waitForCommit(30000));
assertTrue(trigger.waitForNewSearcher(30000));
trigger.reset();
req.setContentStreams( toContentStreams(
adoc("id", "550", "field_t", "what's inside?", "subject", "info"), null ) );
@ -208,7 +202,7 @@ public class AutoCommitTest extends AbstractSolrTestCase {
assertQ("should not be there yet", req("id:500") ,"//result[@numFound=0]" );
// Wait longer than the autocommit time
assertTrue(trigger.waitForCommit(45000));
assertTrue(trigger.waitForNewSearcher(45000));
trigger.reset();
req.setContentStreams( toContentStreams(
@ -220,4 +214,152 @@ public class AutoCommitTest extends AbstractSolrTestCase {
assertQ("but not this", req("id:531") ,"//result[@numFound=0]" );
}
public void testSoftCommitMaxDocs() throws Exception {
SolrCore core = h.getCore();
NewSearcherListener trigger = new NewSearcherListener();
DirectUpdateHandler2 updateHandler = (DirectUpdateHandler2)core.getUpdateHandler();
CommitTracker tracker = updateHandler.commitTracker;
tracker.timeUpperBound = -1;
tracker.docsUpperBound = 8;
NewSearcherListener softTrigger = new NewSearcherListener();
CommitTracker softTracker = updateHandler.softCommitTracker;
softTracker.timeUpperBound = -1;
softTracker.docsUpperBound = 4;
core.registerNewSearcherListener(softTrigger);
XmlUpdateRequestHandler handler = new XmlUpdateRequestHandler();
handler.init( null );
MapSolrParams params = new MapSolrParams( new HashMap<String, String>() );
// Add documents
SolrQueryResponse rsp = new SolrQueryResponse();
SolrQueryRequestBase req = new SolrQueryRequestBase( core, params ) {};
for( int i=0; i<4; i++ ) {
req.setContentStreams( toContentStreams(
adoc("id", Integer.toString(i), "subject", "info" ), null ) );
handler.handleRequest( req, rsp );
}
// It should not be there right away
assertQ("shouldn't find any", req("id:1") ,"//result[@numFound=0]" );
assertEquals( 0, tracker.getCommitCount());
req.setContentStreams( toContentStreams(
adoc("id", "4", "subject", "info" ), null ) );
handler.handleRequest( req, rsp );
assertTrue(softTrigger.waitForNewSearcher(10000));
core.registerNewSearcherListener(trigger);
assertQ("should find 5", req("*:*") ,"//result[@numFound=5]" );
assertEquals( 1, softTracker.getCommitCount());
assertEquals( 0, tracker.getCommitCount());
req.setContentStreams( toContentStreams(
adoc("id", "5", "subject", "info" ), null ) );
handler.handleRequest( req, rsp );
// Now make sure we can find it
assertQ("should find one", req("id:4") ,"//result[@numFound=1]" );
assertEquals( 1, softTracker.getCommitCount());
// But not the one added afterward
assertQ("should not find one", req("id:5") ,"//result[@numFound=0]" );
assertEquals( 1, softTracker.getCommitCount());
for( int i=6; i<10; i++ ) {
req.setContentStreams( toContentStreams(
adoc("id", Integer.toString(i), "subject", "info" ), null ) );
handler.handleRequest( req, rsp );
}
req.close();
assertTrue(trigger.waitForNewSearcher(10000));
assertQ("should find 10", req("*:*") ,"//result[@numFound=10]" );
assertEquals( 1, softTracker.getCommitCount());
assertEquals( 1, tracker.getCommitCount());
}
public void testSoftCommitMaxTime() throws Exception {
SolrCore core = h.getCore();
NewSearcherListener trigger = new NewSearcherListener();
core.registerNewSearcherListener(trigger);
DirectUpdateHandler2 updater = (DirectUpdateHandler2) core.getUpdateHandler();
CommitTracker tracker = updater.commitTracker;
CommitTracker softTracker = updater.softCommitTracker;
// too low of a number can cause a slow host to commit before the test code checks that it
// isn't there... causing a failure at "shouldn't find any"
softTracker.timeUpperBound = 2000;
softTracker.docsUpperBound = -1;
// updater.commitCallbacks.add(trigger);
XmlUpdateRequestHandler handler = new XmlUpdateRequestHandler();
handler.init( null );
MapSolrParams params = new MapSolrParams( new HashMap<String, String>() );
// Add a single document
SolrQueryResponse rsp = new SolrQueryResponse();
SolrQueryRequestBase req = new SolrQueryRequestBase( core, params ) {};
req.setContentStreams( toContentStreams(
adoc("id", "529", "field_t", "what's inside?", "subject", "info"), null ) );
trigger.reset();
handler.handleRequest( req, rsp );
// Check it it is in the index
assertQ("shouldn't find any", req("id:529") ,"//result[@numFound=0]" );
// Wait longer than the autocommit time
assertTrue(trigger.waitForNewSearcher(30000));
trigger.reset();
req.setContentStreams( toContentStreams(
adoc("id", "530", "field_t", "what's inside?", "subject", "info"), null ) );
handler.handleRequest( req, rsp );
// Now make sure we can find it
assertQ("should find one", req("id:529") ,"//result[@numFound=1]" );
// But not this one
assertQ("should find none", req("id:530") ,"//result[@numFound=0]" );
// Delete the document
assertU( delI("529") );
assertQ("deleted, but should still be there", req("id:529") ,"//result[@numFound=1]" );
// Wait longer than the autocommit time
assertTrue(trigger.waitForNewSearcher(15000));
trigger.reset();
req.setContentStreams( toContentStreams(
adoc("id", "550", "field_t", "what's inside?", "subject", "info"), null ) );
handler.handleRequest( req, rsp );
assertEquals( 2, softTracker.getCommitCount() );
assertQ("deleted and time has passed", req("id:529") ,"//result[@numFound=0]" );
// now make the call 5 times really fast and make sure it
// only commits once
req.setContentStreams( toContentStreams(
adoc("id", "500" ), null ) );
for( int i=0;i<5; i++ ) {
handler.handleRequest( req, rsp );
}
assertQ("should not be there yet", req("id:500") ,"//result[@numFound=0]" );
// Wait longer than the autocommit time
assertTrue(trigger.waitForNewSearcher(15000));
trigger.reset();
req.setContentStreams( toContentStreams(
adoc("id", "531", "field_t", "what's inside?", "subject", "info"), null ) );
handler.handleRequest( req, rsp );
assertEquals( 3, softTracker.getCommitCount() );
assertEquals( 0, tracker.getCommitCount() );
assertQ("now it should", req("id:500") ,"//result[@numFound=1]" );
assertQ("but not this", req("id:531") ,"//result[@numFound=0]" );
}
}