SOLR-3656 SOLR-3662: core reload improvements

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1364409 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Yonik Seeley 2012-07-22 19:45:00 +00:00
parent bbd6934008
commit 19aa69b0cd
9 changed files with 106 additions and 36 deletions

View File

@ -120,6 +120,15 @@ Bug Fixes
* SOLR-3658: Adding thousands of docs with one UpdateProcessorChain instance can briefly create
spikes of threads in the thousands. (yonik, Mark Miller)
* SOLR-3656: A core reload now always uses the same dataDir. (Mark Miller, yonik)
* SOLR-3662: Core reload bugs: a reload always obtained a non-NRT searcher, which
could go back in time with respect to the previous core's NRT searcher. Versioning
did not work correctly across a core reload, and update handler synchronization
was changed to synchronize on core state since more than on update handler
can coexist for a single index during a reload. (yonik)
Other Changes
----------------------

View File

@ -945,7 +945,7 @@ public class CoreContainer
}
}
SolrCore newCore = core.reload(solrLoader);
SolrCore newCore = core.reload(solrLoader, core);
// keep core to orig name link
String origName = coreToOrigName.remove(core);
if (origName != null) {

View File

@ -357,7 +357,7 @@ public final class SolrCore implements SolrInfoMBean {
return responseWriters.put(name, responseWriter);
}
public SolrCore reload(SolrResourceLoader resourceLoader) throws IOException,
public SolrCore reload(SolrResourceLoader resourceLoader, SolrCore prev) throws IOException,
ParserConfigurationException, SAXException {
// TODO - what if indexwriter settings have changed
@ -368,8 +368,8 @@ public final class SolrCore implements SolrInfoMBean {
getSchema().getResourceName(), null);
updateHandler.incref();
SolrCore core = new SolrCore(getName(), null, config,
schema, coreDescriptor, updateHandler);
SolrCore core = new SolrCore(getName(), getDataDir(), config,
schema, coreDescriptor, updateHandler, prev);
return core;
}
@ -548,7 +548,7 @@ public final class SolrCore implements SolrInfoMBean {
* @since solr 1.3
*/
public SolrCore(String name, String dataDir, SolrConfig config, IndexSchema schema, CoreDescriptor cd) {
this(name, dataDir, config, schema, cd, null);
this(name, dataDir, config, schema, cd, null, null);
}
/**
@ -561,7 +561,7 @@ public final class SolrCore implements SolrInfoMBean {
*
*@since solr 1.3
*/
public SolrCore(String name, String dataDir, SolrConfig config, IndexSchema schema, CoreDescriptor cd, UpdateHandler updateHandler) {
public SolrCore(String name, String dataDir, SolrConfig config, IndexSchema schema, CoreDescriptor cd, UpdateHandler updateHandler, SolrCore prev) {
coreDescriptor = cd;
this.setName( name );
resourceLoader = config.getResourceLoader();
@ -640,10 +640,31 @@ public final class SolrCore implements SolrInfoMBean {
}
});
// use the (old) writer to open the first searcher
RefCounted<IndexWriter> iwRef = null;
if (prev != null) {
iwRef = prev.getUpdateHandler().getSolrCoreState().getIndexWriter(null);
if (iwRef != null) {
final IndexWriter iw = iwRef.get();
newReaderCreator = new Callable<DirectoryReader>() {
@Override
public DirectoryReader call() throws Exception {
return DirectoryReader.open(iw, true);
}
};
}
}
// Open the searcher *before* the update handler so we don't end up opening
// one in the middle.
// With lockless commits in Lucene now, this probably shouldn't be an issue anymore
getSearcher(false,false,null);
try {
getSearcher(false,false,null,true);
} finally {
newReaderCreator = null;
if (iwRef != null) iwRef.decref();
}
String updateHandlerClass = solrConfig.getUpdateHandlerInfo().className;
@ -1057,7 +1078,7 @@ public final class SolrCore implements SolrInfoMBean {
private final int maxWarmingSearchers; // max number of on-deck searchers allowed
private RefCounted<SolrIndexSearcher> realtimeSearcher;
private Callable<DirectoryReader> newReaderCreator;
/**
* Return a registered {@link RefCounted}&lt;{@link SolrIndexSearcher}&gt; with
@ -1208,9 +1229,20 @@ public final class SolrCore implements SolrInfoMBean {
tmp = new SolrIndexSearcher(this, schema, (realtime ? "realtime":"main"), newReader, true, !realtime, true, directoryFactory);
} else {
// newestSearcher == null at this point
if (newReaderCreator != null) {
// this is set in the constructor if there is a currently open index writer
// so that we pick up any uncommitted changes and so we don't go backwards
// in time on a core reload
DirectoryReader newReader = newReaderCreator.call();
tmp = new SolrIndexSearcher(this, schema, (realtime ? "realtime":"main"), newReader, true, !realtime, true, directoryFactory);
} else {
// normal open that happens at startup
// verbose("non-reopen START:");
tmp = new SolrIndexSearcher(this, newIndexDir, schema, getSolrConfig().indexConfig, "main", true, directoryFactory);
// verbose("non-reopen DONE: searcher=",tmp);
}
}
List<RefCounted<SolrIndexSearcher>> searcherList = realtime ? _realtimeSearchers : _searchers;

View File

@ -55,12 +55,19 @@ public final class DefaultSolrCoreState extends SolrCoreState {
@Override
public synchronized RefCounted<IndexWriter> getIndexWriter(SolrCore core)
throws IOException {
if (core == null) {
// core == null is a signal to just return the current writer, or null if none.
if (refCntWriter != null) refCntWriter.incref();
return refCntWriter;
}
while (pauseWriter) {
try {
wait();
} catch (InterruptedException e) {}
}
if (indexWriter == null) {
indexWriter = createMainIndexWriter(core, "DirectUpdateHandler2", false,
false);

View File

@ -343,7 +343,7 @@ public class DirectUpdateHandler2 extends UpdateHandler implements SolrCoreState
// currently for testing purposes. Do a delete of complete index w/o worrying about versions, don't log, clean up most state in update log, etc
if (delAll && cmd.getVersion() == -Long.MAX_VALUE) {
synchronized (this) {
synchronized (solrCoreState) {
deleteAll();
ulog.deleteAll();
return;
@ -356,7 +356,7 @@ public class DirectUpdateHandler2 extends UpdateHandler implements SolrCoreState
// a realtime view of the index. When a new searcher is opened after a DBQ, that
// flag can be cleared. If those thing happen concurrently, it's not thread safe.
//
synchronized (this) {
synchronized (solrCoreState) {
if (delAll) {
deleteAll();
} else {
@ -392,7 +392,7 @@ public class DirectUpdateHandler2 extends UpdateHandler implements SolrCoreState
Term idTerm = new Term(idField.getName(), cmd.getIndexedId());
// see comment in deleteByQuery
synchronized (this) {
synchronized (solrCoreState) {
RefCounted<IndexWriter> iw = solrCoreState.getIndexWriter(core);
try {
IndexWriter writer = iw.get();
@ -518,7 +518,7 @@ public class DirectUpdateHandler2 extends UpdateHandler implements SolrCoreState
}
if (!cmd.softCommit) {
synchronized (this) { // sync is currently needed to prevent preCommit
synchronized (solrCoreState) { // sync is currently needed to prevent preCommit
// from being called between preSoft and
// postSoft... see postSoft comments.
if (ulog != null) ulog.preCommit(cmd);
@ -547,14 +547,14 @@ public class DirectUpdateHandler2 extends UpdateHandler implements SolrCoreState
if (cmd.softCommit) {
// ulog.preSoftCommit();
synchronized (this) {
synchronized (solrCoreState) {
if (ulog != null) ulog.preSoftCommit(cmd);
core.getSearcher(true, false, waitSearcher, true);
if (ulog != null) ulog.postSoftCommit(cmd);
}
// ulog.postSoftCommit();
} else {
synchronized (this) {
synchronized (solrCoreState) {
if (ulog != null) ulog.preSoftCommit(cmd);
if (cmd.openSearcher) {
core.getSearcher(true, false, waitSearcher);
@ -705,7 +705,7 @@ public class DirectUpdateHandler2 extends UpdateHandler implements SolrCoreState
// TODO: keep other commit callbacks from being called?
// this.commit(cmd); // too many test failures using this method... is it because of callbacks?
synchronized (this) {
synchronized (solrCoreState) {
ulog.preCommit(cmd);
}
@ -714,7 +714,7 @@ public class DirectUpdateHandler2 extends UpdateHandler implements SolrCoreState
commitData.put(SolrIndexWriter.COMMIT_TIME_MSEC_KEY, String.valueOf(System.currentTimeMillis()));
writer.commit(commitData);
synchronized (this) {
synchronized (solrCoreState) {
ulog.postCommit(cmd);
}
}

View File

@ -138,7 +138,7 @@ public class UpdateLog implements PluginInfoInitialized {
private SyncLevel defaultSyncLevel = SyncLevel.FLUSH;
private volatile UpdateHandler uhandler; // a core reload can change this reference!
volatile UpdateHandler uhandler; // a core reload can change this reference!
private volatile boolean cancelApplyBufferUpdate;
List<Long> startingVersions;
int startingOperation; // last operation in the logs on startup
@ -177,6 +177,9 @@ public class UpdateLog implements PluginInfoInitialized {
if (debug) {
log.debug("UpdateHandler init: tlogDir=" + tlogDir + ", next id=" + id, " this is a reopen... nothing else to do.");
}
versionInfo.reload();
// on a normal reopen, we currently shouldn't have to do anything
return;
}
@ -209,7 +212,7 @@ public class UpdateLog implements PluginInfoInitialized {
if (newestLogsOnStartup.size() >= 2) break;
}
versionInfo = new VersionInfo(uhandler, 256);
versionInfo = new VersionInfo(this, 256);
// TODO: these startingVersions assume that we successfully recover from all non-complete tlogs.
UpdateLog.RecentUpdates startingUpdates = getRecentUpdates();

View File

@ -19,7 +19,6 @@ package org.apache.solr.update;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@ -36,16 +35,15 @@ import org.apache.solr.util.RefCounted;
public class VersionInfo {
public static final String VERSION_FIELD="_version_";
private SolrCore core;
private UpdateHandler updateHandler;
private final UpdateLog ulog;
private final VersionBucket[] buckets;
private SchemaField versionField;
private SchemaField idField;
final ReadWriteLock lock = new ReentrantReadWriteLock(true);
public VersionInfo(UpdateHandler updateHandler, int nBuckets) {
this.updateHandler = updateHandler;
this.core = updateHandler.core;
public VersionInfo(UpdateLog ulog, int nBuckets) {
this.ulog = ulog;
SolrCore core = ulog.uhandler.core;
versionField = core.getSchema().getFieldOrNull(VERSION_FIELD);
idField = core.getSchema().getUniqueKeyField();
buckets = new VersionBucket[ BitUtil.nextHighestPowerOfTwo(nBuckets) ];
@ -54,6 +52,10 @@ public class VersionInfo {
}
}
public void reload() {
}
public SchemaField getVersionField() {
return versionField;
}
@ -143,14 +145,14 @@ public class VersionInfo {
}
public Long lookupVersion(BytesRef idBytes) {
return updateHandler.ulog.lookupVersion(idBytes);
return ulog.lookupVersion(idBytes);
}
public Long getVersionFromIndex(BytesRef idBytes) {
// TODO: we could cache much of this and invalidate during a commit.
// TODO: most DocValues classes are threadsafe - expose which.
RefCounted<SolrIndexSearcher> newestSearcher = core.getRealtimeSearcher();
RefCounted<SolrIndexSearcher> newestSearcher = ulog.uhandler.core.getRealtimeSearcher();
try {
SolrIndexSearcher searcher = newestSearcher.get();
long lookup = searcher.lookupId(idBytes);

View File

@ -22,12 +22,11 @@ import org.junit.Ignore;
import java.util.Random;
@Ignore
public class TestReload extends TestRTGBase {
@BeforeClass
public static void beforeClass() throws Exception {
useFactory(null); // force FS directory
// useFactory(null); // force FS directory
initCore("solrconfig-tlog.xml","schema15.xml");
}
@ -37,36 +36,48 @@ public class TestReload extends TestRTGBase {
assertU(commit());
long version = addAndGetVersion(sdoc("id","1") , null);
// h.getCoreContainer().reload(h.getCore().getName());
assertU(commit("softCommit","true")); // should cause a RTG searcher to be opened
assertU(commit("openSearcher","false")); // should cause a RTG searcher to be opened
assertJQ(req("qt","/get","id","1")
,"=={'doc':{'id':'1','_version_':" + version + "}}"
);
h.reload();
// should also use the RTG searcher (commit should have cleared the translog cache)
assertJQ(req("qt","/get","id","1")
,"=={'doc':{'id':'1','_version_':" + version + "}}"
);
assertU(commit("softCommit","true")); // open a normal (caching) NRT searcher
h.getCoreContainer().reload(h.getCore().getName());
assertJQ(req("q","id:1")
,"/response/numFound==1"
);
Random rand = random();
int iter = atLeast(20);
for (int i=0; i<iter; i++) {
if (rand.nextBoolean()) {
// System.out.println("!!! add");
version = addAndGetVersion(sdoc("id","1") , null);
}
if (rand.nextBoolean()) {
if (rand.nextBoolean()) {
// System.out.println("!!! flush");
assertU(commit("openSearcher","false")); // should cause a RTG searcher to be opened as well
} else {
assertU(commit("softCommit", ""+rand.nextBoolean()));
boolean softCommit = rand.nextBoolean();
System.out.println("!!! softCommit" + softCommit);
// assertU(commit("softCommit", ""+softCommit));
}
}
if (rand.nextBoolean()) {
// RTG should always be able to see the last version
// System.out.println("!!! rtg");
assertJQ(req("qt","/get","id","1")
,"=={'doc':{'id':'1','_version_':" + version + "}}"
);
@ -74,13 +85,16 @@ public class TestReload extends TestRTGBase {
if (rand.nextBoolean()) {
// a normal search should always find 1 doc
// System.out.println("!!! q");
assertJQ(req("q","id:1")
,"/response/numFound==1"
);
}
// TODO: randomly do a reload
// but the test currently fails without this!
if (rand.nextBoolean()) {
// System.out.println("!!! reload");
h.reload();
}
}
// test framework should ensure that all searchers opened have been closed.

View File

@ -223,6 +223,9 @@ public class TestHarness {
}
public void reload() throws Exception {
container.reload(coreName);
}
/**
* Processes an "update" (add, commit or optimize) and