SOLR-6002: Fix a couple of ugly issues around SolrIndexWriter close and rollback as well as how SolrIndexWriter manages it's ref counted directory instance.

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1589294 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Mark Robert Miller 2014-04-22 21:23:31 +00:00
parent 26be737077
commit 678758ae57
6 changed files with 65 additions and 59 deletions

View File

@ -117,6 +117,10 @@ Bug Fixes
* SOLR-5993: ZkController can warn about shard leader conflict even after the conflict * SOLR-5993: ZkController can warn about shard leader conflict even after the conflict
is resolved. (Gregory Chanan via shalin) is resolved. (Gregory Chanan via shalin)
* SOLR-6002: Fix a couple of ugly issues around SolrIndexWriter close and
rollback as well as how SolrIndexWriter manages it's ref counted directory
instance. (Mark Miller)
Other Changes Other Changes
--------------------- ---------------------

View File

@ -173,6 +173,7 @@ public abstract class CachingDirectoryFactory extends DirectoryFactory {
this.getClass().getSimpleName(), val); this.getClass().getSimpleName(), val);
try { try {
// if there are still refs out, we have to wait for them // if there are still refs out, we have to wait for them
assert val.refCnt > -1 : val.refCnt;
int cnt = 0; int cnt = 0;
while(val.refCnt != 0) { while(val.refCnt != 0) {
wait(100); wait(100);

View File

@ -17,7 +17,6 @@
package org.apache.solr.core; package org.apache.solr.core;
import org.apache.commons.io.IOUtils;
import org.apache.lucene.codecs.Codec; import org.apache.lucene.codecs.Codec;
import org.apache.lucene.index.DirectoryReader; import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexDeletionPolicy; import org.apache.lucene.index.IndexDeletionPolicy;
@ -89,6 +88,7 @@ import org.apache.solr.update.processor.RunUpdateProcessorFactory;
import org.apache.solr.update.processor.UpdateRequestProcessorChain; import org.apache.solr.update.processor.UpdateRequestProcessorChain;
import org.apache.solr.update.processor.UpdateRequestProcessorFactory; import org.apache.solr.update.processor.UpdateRequestProcessorFactory;
import org.apache.solr.util.DefaultSolrThreadFactory; import org.apache.solr.util.DefaultSolrThreadFactory;
import org.apache.solr.util.IOUtils;
import org.apache.solr.util.PropertiesInputStream; import org.apache.solr.util.PropertiesInputStream;
import org.apache.solr.util.RefCounted; import org.apache.solr.util.RefCounted;
import org.apache.solr.util.plugin.NamedListInitializedPlugin; import org.apache.solr.util.plugin.NamedListInitializedPlugin;
@ -418,8 +418,8 @@ public final class SolrCore implements SolrInfoMBean, Closeable {
ParserConfigurationException, SAXException { ParserConfigurationException, SAXException {
solrCoreState.increfSolrCoreState(); solrCoreState.increfSolrCoreState();
boolean indexDirChange = !getNewIndexDir().equals(getIndexDir());
if (!getNewIndexDir().equals(getIndexDir())) { if (indexDirChange || !coreConfig.getSolrConfig().nrtMode) {
// the directory is changing, don't pass on state // the directory is changing, don't pass on state
prev = null; prev = null;
} }
@ -428,6 +428,8 @@ public final class SolrCore implements SolrInfoMBean, Closeable {
coreConfig.getIndexSchema(), coreDescriptor, updateHandler, this.solrDelPolicy, prev); coreConfig.getIndexSchema(), coreDescriptor, updateHandler, this.solrDelPolicy, prev);
core.solrDelPolicy = this.solrDelPolicy; core.solrDelPolicy = this.solrDelPolicy;
// we open a new indexwriter to pick up the latest config
core.getUpdateHandler().getSolrCoreState().newIndexWriter(core, false); core.getUpdateHandler().getSolrCoreState().newIndexWriter(core, false);
core.getSearcher(true, false, null, true); core.getSearcher(true, false, null, true);
@ -518,7 +520,7 @@ public final class SolrCore implements SolrInfoMBean, Closeable {
SolrIndexWriter writer = SolrIndexWriter.create("SolrCore.initIndex", indexDir, getDirectoryFactory(), true, SolrIndexWriter writer = SolrIndexWriter.create("SolrCore.initIndex", indexDir, getDirectoryFactory(), true,
getLatestSchema(), solrConfig.indexConfig, solrDelPolicy, codec); getLatestSchema(), solrConfig.indexConfig, solrDelPolicy, codec);
writer.close(); writer.shutdown();
} }
@ -854,7 +856,15 @@ public final class SolrCore implements SolrInfoMBean, Closeable {
if (e instanceof OutOfMemoryError) { if (e instanceof OutOfMemoryError) {
throw (OutOfMemoryError)e; throw (OutOfMemoryError)e;
} }
close();
try {
this.close();
} catch (Throwable t) {
if (t instanceof OutOfMemoryError) {
throw (OutOfMemoryError)t;
}
log.error("Error while closing", t);
}
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
e.getMessage(), e); e.getMessage(), e);

View File

@ -68,7 +68,7 @@ public final class DefaultSolrCoreState extends SolrCoreState implements Recover
closer.closeWriter(indexWriter); closer.closeWriter(indexWriter);
} else if (indexWriter != null) { } else if (indexWriter != null) {
log.info("closing IndexWriter..."); log.info("closing IndexWriter...");
indexWriter.close(); indexWriter.shutdown();
} }
indexWriter = null; indexWriter = null;
} catch (Exception e) { } catch (Exception e) {
@ -161,7 +161,7 @@ public final class DefaultSolrCoreState extends SolrCoreState implements Recover
if (!rollback) { if (!rollback) {
try { try {
log.info("Closing old IndexWriter... core=" + coreName); log.info("Closing old IndexWriter... core=" + coreName);
indexWriter.close(); indexWriter.shutdown();
} catch (Exception e) { } catch (Exception e) {
SolrException.log(log, "Error closing old IndexWriter. core=" SolrException.log(log, "Error closing old IndexWriter. core="
+ coreName, e); + coreName, e);

View File

@ -745,7 +745,7 @@ public class DirectUpdateHandler2 extends UpdateHandler implements SolrCoreState
try { try {
if (tryToCommit) { if (tryToCommit) {
log.info("Committing on IndexWriter close.");
CommitUpdateCommand cmd = new CommitUpdateCommand(req, false); CommitUpdateCommand cmd = new CommitUpdateCommand(req, false);
cmd.openSearcher = false; cmd.openSearcher = false;
cmd.waitSearcher = false; cmd.waitSearcher = false;
@ -786,7 +786,7 @@ public class DirectUpdateHandler2 extends UpdateHandler implements SolrCoreState
} }
} }
if (writer != null) writer.shutdown(); if (writer != null) writer.close();
} finally { } finally {
solrCoreState.getCommitLock().unlock(); solrCoreState.getCommitLock().unlock();

View File

@ -17,10 +17,7 @@
package org.apache.solr.update; package org.apache.solr.update;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException; import java.io.IOException;
import java.io.PrintStream;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import org.apache.lucene.codecs.Codec; import org.apache.lucene.codecs.Codec;
@ -29,11 +26,10 @@ import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig; import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.store.Directory; import org.apache.lucene.store.Directory;
import org.apache.lucene.util.InfoStream; import org.apache.lucene.util.InfoStream;
import org.apache.lucene.util.PrintStreamInfoStream;
import org.apache.lucene.util.ThreadInterruptedException;
import org.apache.solr.core.DirectoryFactory; import org.apache.solr.core.DirectoryFactory;
import org.apache.solr.core.DirectoryFactory.DirContext; import org.apache.solr.core.DirectoryFactory.DirContext;
import org.apache.solr.schema.IndexSchema; import org.apache.solr.schema.IndexSchema;
import org.apache.solr.util.IOUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -53,8 +49,12 @@ public class SolrIndexWriter extends IndexWriter {
* System.currentTimeMillis() when commit was called. */ * System.currentTimeMillis() when commit was called. */
public static final String COMMIT_TIME_MSEC_KEY = "commitTimeMSec"; public static final String COMMIT_TIME_MSEC_KEY = "commitTimeMSec";
private static final Object CLOSE_LOCK = new Object();
String name; String name;
private DirectoryFactory directoryFactory; private DirectoryFactory directoryFactory;
private InfoStream infoStream;
private Directory directory;
public static SolrIndexWriter create(String name, String path, DirectoryFactory directoryFactory, boolean create, IndexSchema schema, SolrIndexConfig config, IndexDeletionPolicy delPolicy, Codec codec) throws IOException { public static SolrIndexWriter create(String name, String path, DirectoryFactory directoryFactory, boolean create, IndexSchema schema, SolrIndexConfig config, IndexDeletionPolicy delPolicy, Codec codec) throws IOException {
@ -81,6 +81,8 @@ public class SolrIndexWriter extends IndexWriter {
); );
log.debug("Opened Writer " + name); log.debug("Opened Writer " + name);
this.name = name; this.name = name;
infoStream = getConfig().getInfoStream();
this.directory = directory;
numOpens.incrementAndGet(); numOpens.incrementAndGet();
} }
@ -120,65 +122,54 @@ public class SolrIndexWriter extends IndexWriter {
*/ */
private volatile boolean isClosed = false; private volatile boolean isClosed = false;
@Override @Override
public void close() throws IOException { public void close() throws IOException {
log.debug("Closing Writer " + name); log.debug("Closing Writer " + name);
Directory directory = getDirectory();
final InfoStream infoStream = isClosed ? null : getConfig().getInfoStream();
try { try {
while (true) { super.close();
try { } catch (Throwable t) {
flush(true, true); if (t instanceof OutOfMemoryError) {
waitForMerges(); throw (OutOfMemoryError) t;
commit();
super.rollback();
} catch (ThreadInterruptedException e) {
// don't allow interruption
continue;
} catch (Throwable t) {
if (t instanceof OutOfMemoryError) {
throw (OutOfMemoryError) t;
}
log.error("Error closing IndexWriter, trying rollback", t);
super.rollback();
}
if (IndexWriter.isLocked(directory)) {
try {
IndexWriter.unlock(directory);
} catch (Exception e) {
log.error("Coud not unlock directory after seemingly failed IndexWriter#close()", e);
}
}
break;
} }
log.error("Error closing IndexWriter", t);
} finally { } finally {
if (infoStream != null) { cleanup();
infoStream.close();
}
isClosed = true;
directoryFactory.release(directory);
numCloses.incrementAndGet();
} }
} }
@Override @Override
public void rollback() throws IOException { public void rollback() throws IOException {
Directory dir = getDirectory(); log.debug("Rollback Writer " + name);
try { try {
while (true) { super.rollback();
try { } catch (Throwable t) {
super.rollback(); if (t instanceof OutOfMemoryError) {
} catch (ThreadInterruptedException e) { throw (OutOfMemoryError) t;
// don't allow interruption
continue;
}
break;
} }
log.error("Exception rolling back IndexWriter", t);
} finally { } finally {
isClosed = true; cleanup();
directoryFactory.release(dir); }
}
private void cleanup() throws IOException {
// It's kind of an implementation detail whether
// or not IndexWriter#close calls rollback, so
// we assume it may or may not
boolean doClose = false;
synchronized (CLOSE_LOCK) {
if (!isClosed) {
doClose = true;
isClosed = true;
}
}
if (doClose) {
if (infoStream != null) {
IOUtils.closeQuietly(infoStream);
}
numCloses.incrementAndGet(); numCloses.incrementAndGet();
directoryFactory.release(directory);
} }
} }