SOLR-3861: Refactor SolrCoreState so that it's managed by SolrCore.

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1392360 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Mark Robert Miller 2012-10-01 14:42:44 +00:00
parent e910a61b41
commit f89cc61c55
7 changed files with 109 additions and 116 deletions

View File

@ -68,6 +68,9 @@ Other Changes
* SOLR-3899: SolrCore should not log at warning level when the index directory
changes - it's an info event. (Tobias Bergman, Mark Miller)
* SOLR-3861: Refactor SolrCoreState so that it's managed by SolrCore.
(Mark Miller, hossman)
================== 4.0.0 ==================
Versions of Major Components

View File

@ -62,10 +62,13 @@ import org.apache.solr.search.QParserPlugin;
import org.apache.solr.search.SolrFieldCacheMBean;
import org.apache.solr.search.SolrIndexSearcher;
import org.apache.solr.search.ValueSourceParser;
import org.apache.solr.update.DefaultSolrCoreState;
import org.apache.solr.update.DirectUpdateHandler2;
import org.apache.solr.update.SolrCoreState;
import org.apache.solr.update.SolrIndexWriter;
import org.apache.solr.update.UpdateHandler;
import org.apache.solr.update.VersionInfo;
import org.apache.solr.update.SolrCoreState.IndexWriterCloser;
import org.apache.solr.update.processor.DistributedUpdateProcessorFactory;
import org.apache.solr.update.processor.LogUpdateProcessorFactory;
import org.apache.solr.update.processor.RunUpdateProcessorFactory;
@ -140,6 +143,9 @@ public final class SolrCore implements SolrInfoMBean {
private final IndexSchema schema;
private final String dataDir;
private final UpdateHandler updateHandler;
private final SolrCoreState solrCoreState;
private int solrCoreStateRefCnt = 1;
private final long startTime;
private final RequestHandlers reqHandlers;
private final Map<String,SearchComponent> searchComponents;
@ -369,7 +375,8 @@ public final class SolrCore implements SolrInfoMBean {
IndexSchema schema = new IndexSchema(config,
getSchema().getResourceName(), null);
updateHandler.incref();
increfSolrCoreState();
SolrCore core = new SolrCore(getName(), getDataDir(), config,
schema, coreDescriptor, updateHandler, prev);
return core;
@ -637,8 +644,10 @@ public final class SolrCore implements SolrInfoMBean {
if (updateHandler == null) {
initDirectoryFactory();
solrCoreState = new DefaultSolrCoreState(getDirectoryFactory());
} else {
directoryFactory = updateHandler.getSolrCoreState().getDirectoryFactory();
solrCoreState = updateHandler.getSolrCoreState();
directoryFactory = solrCoreState.getDirectoryFactory();
this.isReloaded = true;
}
@ -790,6 +799,36 @@ public final class SolrCore implements SolrInfoMBean {
map.put("", def);
return map;
}
public SolrCoreState getSolrCoreState() {
return solrCoreState;
}
private void increfSolrCoreState() {
synchronized (solrCoreState) {
if (solrCoreStateRefCnt == 0) {
throw new IllegalStateException("IndexWriter has been closed");
}
solrCoreStateRefCnt++;
}
}
private void decrefSolrCoreState(IndexWriterCloser closer) {
synchronized (solrCoreState) {
solrCoreStateRefCnt--;
if (solrCoreStateRefCnt == 0) {
try {
log.info("Closing SolrCoreState");
solrCoreState.close(closer);
} catch (Throwable t) {
log.error("Error closing SolrCoreState", t);
}
}
}
}
/**
* @return an update processor registered to the given name. Throw an exception if this chain is undefined
@ -864,6 +903,21 @@ public final class SolrCore implements SolrInfoMBean {
SolrException.log(log, e);
}
try {
if (null != updateHandler) {
updateHandler.close();
}
} catch (Throwable e) {
SolrException.log(log,e);
}
if (updateHandler instanceof IndexWriterCloser) {
decrefSolrCoreState((IndexWriterCloser)updateHandler);
} else {
decrefSolrCoreState(null);
}
try {
searcherExecutor.shutdown();
if (!searcherExecutor.awaitTermination(60, TimeUnit.SECONDS)) {
@ -893,20 +947,17 @@ public final class SolrCore implements SolrInfoMBean {
} catch (Throwable e) {
SolrException.log(log,e);
}
try {
if (null != updateHandler) {
updateHandler.close();
} else {
if (null != directoryFactory) {
// :HACK: normally we rely on updateHandler to do this,
// but what if updateHandler failed to init?
synchronized (solrCoreState) {
if (solrCoreStateRefCnt == 0) {
try {
directoryFactory.close();
} catch (Throwable t) {
SolrException.log(log, t);
}
}
} catch (Throwable e) {
SolrException.log(log,e);
}
if( closeHooks != null ) {
for( CloseHook hook : closeHooks ) {

View File

@ -39,7 +39,6 @@ public final class DefaultSolrCoreState extends SolrCoreState implements Recover
// protects pauseWriter and writerFree
private final Object writerPauseLock = new Object();
private int refCnt = 1;
private SolrIndexWriter indexWriter = null;
private DirectoryFactory directoryFactory;
@ -56,6 +55,21 @@ public final class DefaultSolrCoreState extends SolrCoreState implements Recover
this.directoryFactory = directoryFactory;
}
private synchronized void closeIndexWriter(IndexWriterCloser closer) {
try {
log.info("SolrCoreState ref count has reached 0 - closing IndexWriter");
if (closer != null) {
log.info("closing IndexWriter with IndexWriterCloser");
closer.closeWriter(indexWriter);
} else if (indexWriter != null) {
log.info("closing IndexWriter...");
indexWriter.close();
}
} catch (Throwable t) {
log.error("Error during shutdown of writer.", t);
}
}
@Override
public synchronized RefCounted<IndexWriter> getIndexWriter(SolrCore core)
throws IOException {
@ -76,17 +90,7 @@ public final class DefaultSolrCoreState extends SolrCoreState implements Recover
if (indexWriter == null) {
indexWriter = createMainIndexWriter(core, "DirectUpdateHandler2", false);
}
if (refCntWriter == null) {
refCntWriter = new RefCounted<IndexWriter>(indexWriter) {
@Override
public void close() {
synchronized (writerPauseLock) {
writerFree = true;
writerPauseLock.notifyAll();
}
}
};
}
initRefCntWriter();
writerFree = false;
writerPauseLock.notifyAll();
refCntWriter.incref();
@ -94,6 +98,20 @@ public final class DefaultSolrCoreState extends SolrCoreState implements Recover
}
}
private void initRefCntWriter() {
if (refCntWriter == null) {
refCntWriter = new RefCounted<IndexWriter>(indexWriter) {
@Override
public void close() {
synchronized (writerPauseLock) {
writerFree = true;
writerPauseLock.notifyAll();
}
}
};
}
}
@Override
public synchronized void newIndexWriter(SolrCore core, boolean rollback) throws IOException {
log.info("Creating new IndexWriter...");
@ -142,46 +160,6 @@ public final class DefaultSolrCoreState extends SolrCoreState implements Recover
}
}
@Override
public void decref(IndexWriterCloser closer) {
synchronized (this) {
refCnt--;
if (refCnt == 0) {
try {
log.info("SolrCoreState ref count has reached 0 - closing IndexWriter");
if (closer != null) {
closer.closeWriter(indexWriter);
} else if (indexWriter != null) {
indexWriter.close();
}
} catch (Throwable t) {
log.error("Error during shutdown of writer.", t);
}
try {
directoryFactory.close();
} catch (Throwable t) {
log.error("Error during shutdown of directory factory.", t);
}
try {
log.info("Closing SolrCoreState - canceling any ongoing recovery");
cancelRecovery();
} catch (Throwable t) {
log.error("Error cancelling recovery", t);
}
closed = true;
}
}
}
@Override
public synchronized void incref() {
if (refCnt == 0) {
throw new IllegalStateException("IndexWriter has been closed");
}
refCnt++;
}
@Override
public synchronized void rollbackIndexWriter(SolrCore core) throws IOException {
newIndexWriter(core, true);
@ -269,5 +247,12 @@ public final class DefaultSolrCoreState extends SolrCoreState implements Recover
public void failed() {
recoveryRunning = false;
}
@Override
public synchronized void close(IndexWriterCloser closer) {
closed = true;
cancelRecovery();
closeIndexWriter(closer);
}
}

View File

@ -96,7 +96,7 @@ public class DirectUpdateHandler2 extends UpdateHandler implements SolrCoreState
public DirectUpdateHandler2(SolrCore core) {
super(core);
solrCoreState = new DefaultSolrCoreState(core.getDirectoryFactory());
solrCoreState = core.getSolrCoreState();
UpdateHandlerInfo updateHandlerInfo = core.getSolrConfig()
.getUpdateHandlerInfo();
@ -111,13 +111,7 @@ public class DirectUpdateHandler2 extends UpdateHandler implements SolrCoreState
public DirectUpdateHandler2(SolrCore core, UpdateHandler updateHandler) {
super(core);
if (updateHandler instanceof DirectUpdateHandler2) {
this.solrCoreState = ((DirectUpdateHandler2) updateHandler).solrCoreState;
} else {
// the impl has changed, so we cannot use the old state - decref it
updateHandler.decref();
solrCoreState = new DefaultSolrCoreState(core.getDirectoryFactory());
}
solrCoreState = core.getSolrCoreState();
UpdateHandlerInfo updateHandlerInfo = core.getSolrConfig()
.getUpdateHandlerInfo();
@ -658,8 +652,6 @@ public class DirectUpdateHandler2 extends UpdateHandler implements SolrCoreState
softCommitTracker.close();
numDocsPending.set(0);
solrCoreState.decref(this);
}
@ -817,20 +809,6 @@ public class DirectUpdateHandler2 extends UpdateHandler implements SolrCoreState
return solrCoreState;
}
@Override
public void decref() {
try {
solrCoreState.decref(this);
} catch (IOException e) {
throw new SolrException(ErrorCode.SERVER_ERROR, "", e);
}
}
@Override
public void incref() {
solrCoreState.incref();
}
// allow access for tests
public CommitTracker getCommitTracker() {
return commitTracker;

View File

@ -54,20 +54,6 @@ public abstract class SolrCoreState {
*/
public abstract RefCounted<IndexWriter> getIndexWriter(SolrCore core) throws IOException;
/**
* Decrement the number of references to this state. When then number of
* references hits 0, the state will close. If an optional closer is
* passed, that will be used to close the writer.
*
* @throws IOException If there is a low-level I/O error.
*/
public abstract void decref(IndexWriterCloser closer) throws IOException;
/**
* Increment the number of references to this state.
*/
public abstract void incref();
/**
* Rollback the current IndexWriter. When creating the new IndexWriter use the
* settings from the given {@link SolrCore}.
@ -90,4 +76,6 @@ public abstract class SolrCoreState {
public abstract void cancelRecovery();
public abstract void close(IndexWriterCloser closer);
}

View File

@ -60,7 +60,7 @@ public class SolrIndexWriter extends IndexWriter {
final Directory d = directoryFactory.get(path, config.lockType, forceNewDirectory);
try {
w = new SolrIndexWriter(name, path, d, create, schema,
config, delPolicy, codec, forceNewDirectory);
config, delPolicy, codec);
w.setDirectoryFactory(directoryFactory);
return w;
} finally {
@ -71,7 +71,7 @@ public class SolrIndexWriter extends IndexWriter {
}
}
private SolrIndexWriter(String name, String path, Directory directory, boolean create, IndexSchema schema, SolrIndexConfig config, IndexDeletionPolicy delPolicy, Codec codec, boolean forceNewDirectory) throws IOException {
private SolrIndexWriter(String name, String path, Directory directory, boolean create, IndexSchema schema, SolrIndexConfig config, IndexDeletionPolicy delPolicy, Codec codec) throws IOException {
super(directory,
config.toIndexWriterConfig(schema).
setOpenMode(create ? IndexWriterConfig.OpenMode.CREATE : IndexWriterConfig.OpenMode.APPEND).

View File

@ -19,7 +19,6 @@ package org.apache.solr.update;
import java.io.IOException;
import java.util.List;
import java.util.Vector;
import org.apache.solr.core.PluginInfo;
@ -29,7 +28,6 @@ import org.apache.solr.core.SolrInfoMBean;
import org.apache.solr.schema.FieldType;
import org.apache.solr.schema.IndexSchema;
import org.apache.solr.schema.SchemaField;
import org.apache.solr.search.SolrIndexSearcher;
import org.apache.solr.util.plugin.SolrCoreAware;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -57,16 +55,6 @@ public abstract class UpdateHandler implements SolrInfoMBean {
protected UpdateLog ulog;
/**
* Called when a SolrCore using this UpdateHandler is closed.
*/
public abstract void decref();
/**
* Called when this UpdateHandler is shared with another SolrCore.
*/
public abstract void incref();
private void parseEventListeners() {
final Class<SolrEventListener> clazz = SolrEventListener.class;
final String label = "Event Listener";