mirror of https://github.com/apache/lucene.git
SOLR-7836: change SolrCoreState to use a reentrant locking implementation to fix deadlocks
git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1701043 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
9e340d3e82
commit
082707f2d8
|
@ -153,7 +153,7 @@ Bug Fixes
|
|||
whitelist valid uses of currentTimeMillis (Ramkumar Aiyengar)
|
||||
|
||||
* SOLR-7836: Possible deadlock when closing refcounted index writers.
|
||||
(Jessica Cheng Mallet, Erick Erickson)
|
||||
(Jessica Cheng Mallet, Erick Erickson, Mark Miller, yonik)
|
||||
|
||||
* SOLR-7869: Overseer does not handle BadVersionException correctly and, in some cases,
|
||||
can go into an infinite loop if cluster state in ZooKeeper is modified externally.
|
||||
|
|
|
@ -18,8 +18,10 @@ package org.apache.solr.update;
|
|||
*/
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.locks.Lock;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
|
||||
import org.apache.lucene.index.IndexWriter;
|
||||
import org.apache.solr.cloud.RecoveryStrategy;
|
||||
|
@ -45,10 +47,11 @@ public final class DefaultSolrCoreState extends SolrCoreState implements Recover
|
|||
private final ActionThrottle recoveryThrottle = new ActionThrottle("recovery", 10000);
|
||||
|
||||
private final ActionThrottle leaderThrottle = new ActionThrottle("leader", 5000);
|
||||
|
||||
// protects pauseWriter and writerFree
|
||||
private final Object writerPauseLock = new Object();
|
||||
|
||||
|
||||
// Use the readLock to retrieve the current IndexWriter (may be lazily opened)
|
||||
// Use the writeLock for changing index writers
|
||||
private final ReentrantReadWriteLock iwLock = new ReentrantReadWriteLock();
|
||||
|
||||
private SolrIndexWriter indexWriter = null;
|
||||
private DirectoryFactory directoryFactory;
|
||||
|
||||
|
@ -57,9 +60,6 @@ public final class DefaultSolrCoreState extends SolrCoreState implements Recover
|
|||
private volatile boolean lastReplicationSuccess = true;
|
||||
|
||||
private RefCounted<IndexWriter> refCntWriter;
|
||||
|
||||
private boolean pauseWriter;
|
||||
private boolean writerFree = true;
|
||||
|
||||
protected final ReentrantLock commitLock = new ReentrantLock();
|
||||
|
||||
|
@ -86,189 +86,141 @@ public final class DefaultSolrCoreState extends SolrCoreState implements Recover
|
|||
@Override
|
||||
public RefCounted<IndexWriter> getIndexWriter(SolrCore core)
|
||||
throws IOException {
|
||||
synchronized (writerPauseLock) {
|
||||
if (closed) {
|
||||
throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE, "SolrCoreState already closed");
|
||||
}
|
||||
|
||||
while (pauseWriter) {
|
||||
try {
|
||||
writerPauseLock.wait(100);
|
||||
} catch (InterruptedException e) {}
|
||||
|
||||
if (closed) {
|
||||
throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE, "Already closed");
|
||||
|
||||
boolean succeeded = false;
|
||||
lock(iwLock.readLock());
|
||||
try {
|
||||
// Multiple readers may be executing this, but we only want one to open the writer on demand.
|
||||
synchronized (this) {
|
||||
if (core == null) {
|
||||
// core == null is a signal to just return the current writer, or null if none.
|
||||
initRefCntWriter();
|
||||
if (refCntWriter == null) return null;
|
||||
} else {
|
||||
if (indexWriter == null) {
|
||||
indexWriter = createMainIndexWriter(core, "DirectUpdateHandler2");
|
||||
}
|
||||
initRefCntWriter();
|
||||
}
|
||||
|
||||
refCntWriter.incref();
|
||||
succeeded = true; // the returned RefCounted<IndexWriter> will release the readLock on a decref()
|
||||
return refCntWriter;
|
||||
}
|
||||
|
||||
if (core == null) {
|
||||
// core == null is a signal to just return the current writer, or null
|
||||
// if none.
|
||||
initRefCntWriter();
|
||||
if (refCntWriter == null) return null;
|
||||
} else {
|
||||
if (indexWriter == null) {
|
||||
indexWriter = createMainIndexWriter(core, "DirectUpdateHandler2");
|
||||
}
|
||||
initRefCntWriter();
|
||||
} finally {
|
||||
// if we failed to return the IW for some other reason, we should unlock.
|
||||
if (!succeeded) {
|
||||
iwLock.readLock().unlock();
|
||||
}
|
||||
|
||||
writerFree = false;
|
||||
refCntWriter.incref();
|
||||
writerPauseLock.notifyAll();
|
||||
return refCntWriter;
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private void initRefCntWriter() {
|
||||
// TODO: since we moved to a read-write lock, and don't rely on the count to close the writer, we don't really
|
||||
// need this class any more. It could also be a singleton created at the same time as SolrCoreState
|
||||
// or we could change the API of SolrCoreState to just return the writer and then add a releaseWriter() call.
|
||||
if (refCntWriter == null && indexWriter != null) {
|
||||
refCntWriter = new RefCounted<IndexWriter>(indexWriter) {
|
||||
|
||||
@Override
|
||||
public void decref() {
|
||||
iwLock.readLock().unlock();
|
||||
super.decref(); // This is now redundant (since we switched to read-write locks), we don't really need to maintain our own reference count.
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
synchronized (writerPauseLock) {
|
||||
writerFree = true;
|
||||
writerPauseLock.notifyAll();
|
||||
}
|
||||
// We rely on other code to actually close the IndexWriter, and there's nothing special to do when the ref count hits 0
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void newIndexWriter(SolrCore core, boolean rollback) throws IOException {
|
||||
log.info("Creating new IndexWriter...");
|
||||
// acquires the lock or throws an exception if the CoreState has been closed.
|
||||
private void lock(Lock lock) {
|
||||
boolean acquired = false;
|
||||
do {
|
||||
try {
|
||||
acquired = lock.tryLock(100, TimeUnit.MILLISECONDS);
|
||||
} catch (InterruptedException e) {
|
||||
log.warn("WARNING - Dangerous interrupt", e);
|
||||
}
|
||||
|
||||
// even if we failed to acquire, check if we are closed
|
||||
if (closed) {
|
||||
if (acquired) {
|
||||
lock.unlock();
|
||||
}
|
||||
throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE, "SolrCoreState already closed.");
|
||||
}
|
||||
} while (!acquired);
|
||||
}
|
||||
|
||||
// closes and opens index writers without any locking
|
||||
private void changeWriter(SolrCore core, boolean rollback, boolean openNewWriter) throws IOException {
|
||||
String coreName = core.getName();
|
||||
synchronized (writerPauseLock) {
|
||||
if (closed) {
|
||||
throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE, "Already closed");
|
||||
}
|
||||
|
||||
// we need to wait for the Writer to fall out of use
|
||||
// first lets stop it from being lent out
|
||||
pauseWriter = true;
|
||||
// then lets wait until it's out of use
|
||||
log.info("Waiting until IndexWriter is unused... core=" + coreName);
|
||||
|
||||
try {
|
||||
while (!writerFree) {
|
||||
try {
|
||||
writerPauseLock.wait(100);
|
||||
} catch (InterruptedException e) {
|
||||
}
|
||||
if (closed) {
|
||||
throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE, "SolrCoreState already closed");
|
||||
}
|
||||
}
|
||||
// We need to null this so it picks up the new writer next get call.
|
||||
// We do this before anything else in case we hit an exception.
|
||||
refCntWriter = null;
|
||||
IndexWriter iw = indexWriter; // temp reference just for closing
|
||||
indexWriter = null; // null this out now in case we fail, so we won't use the writer again
|
||||
|
||||
if (indexWriter != null) {
|
||||
if (!rollback) {
|
||||
try {
|
||||
log.info("Closing old IndexWriter... core=" + coreName);
|
||||
indexWriter.close();
|
||||
} catch (Exception e) {
|
||||
SolrException.log(log, "Error closing old IndexWriter. core="
|
||||
+ coreName, e);
|
||||
}
|
||||
} else {
|
||||
try {
|
||||
log.info("Rollback old IndexWriter... core=" + coreName);
|
||||
indexWriter.rollback();
|
||||
} catch (Exception e) {
|
||||
SolrException.log(log, "Error rolling back old IndexWriter. core="
|
||||
+ coreName, e);
|
||||
}
|
||||
}
|
||||
if (iw != null) {
|
||||
if (!rollback) {
|
||||
try {
|
||||
log.info("Closing old IndexWriter... core=" + coreName);
|
||||
iw.close();
|
||||
} catch (Exception e) {
|
||||
SolrException.log(log, "Error closing old IndexWriter. core=" + coreName, e);
|
||||
}
|
||||
indexWriter = createMainIndexWriter(core, "DirectUpdateHandler2");
|
||||
log.info("New IndexWriter is ready to be used.");
|
||||
// we need to null this so it picks up the new writer next get call
|
||||
refCntWriter = null;
|
||||
} finally {
|
||||
pauseWriter = false;
|
||||
writerPauseLock.notifyAll();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void closeIndexWriter(SolrCore core, boolean rollback)
|
||||
throws IOException {
|
||||
log.info("Closing IndexWriter...");
|
||||
String coreName = core.getName();
|
||||
synchronized (writerPauseLock) {
|
||||
if (closed) {
|
||||
throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE, "Already closed");
|
||||
}
|
||||
|
||||
// we need to wait for the Writer to fall out of use
|
||||
// first lets stop it from being lent out
|
||||
pauseWriter = true;
|
||||
// then lets wait until it's out of use
|
||||
log.info("Waiting until IndexWriter is unused... core=" + coreName);
|
||||
try {
|
||||
while (!writerFree) {
|
||||
try {
|
||||
writerPauseLock.wait(100);
|
||||
} catch (InterruptedException e) {
|
||||
}
|
||||
|
||||
if (closed) {
|
||||
throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE,
|
||||
"SolrCoreState already closed");
|
||||
}
|
||||
} else {
|
||||
try {
|
||||
log.info("Rollback old IndexWriter... core=" + coreName);
|
||||
iw.rollback();
|
||||
} catch (Exception e) {
|
||||
SolrException.log(log, "Error rolling back old IndexWriter. core=" + coreName, e);
|
||||
}
|
||||
|
||||
if (indexWriter != null) {
|
||||
if (!rollback) {
|
||||
try {
|
||||
log.info("Closing old IndexWriter... core=" + coreName);
|
||||
indexWriter.close();
|
||||
} catch (Exception e) {
|
||||
SolrException.log(log, "Error closing old IndexWriter. core="
|
||||
+ coreName, e);
|
||||
}
|
||||
} else {
|
||||
try {
|
||||
log.info("Rollback old IndexWriter... core=" + coreName);
|
||||
indexWriter.rollback();
|
||||
} catch (Exception e) {
|
||||
SolrException.log(log, "Error rolling back old IndexWriter. core="
|
||||
+ coreName, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
pauseWriter = false;
|
||||
writerPauseLock.notifyAll();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void openIndexWriter(SolrCore core) throws IOException {
|
||||
log.info("Creating new IndexWriter...");
|
||||
synchronized (writerPauseLock) {
|
||||
if (closed) {
|
||||
throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE, "Already closed");
|
||||
}
|
||||
|
||||
try {
|
||||
indexWriter = createMainIndexWriter(core, "DirectUpdateHandler2");
|
||||
log.info("New IndexWriter is ready to be used.");
|
||||
// we need to null this so it picks up the new writer next get call
|
||||
refCntWriter = null;
|
||||
} finally {
|
||||
pauseWriter = false;
|
||||
writerPauseLock.notifyAll();
|
||||
}
|
||||
if (openNewWriter) {
|
||||
indexWriter = createMainIndexWriter(core, "DirectUpdateHandler2");
|
||||
log.info("New IndexWriter is ready to be used.");
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void rollbackIndexWriter(SolrCore core) throws IOException {
|
||||
newIndexWriter(core, true);
|
||||
public void newIndexWriter(SolrCore core, boolean rollback) throws IOException {
|
||||
lock(iwLock.writeLock());
|
||||
try {
|
||||
changeWriter(core, rollback, true);
|
||||
} finally {
|
||||
iwLock.writeLock().unlock();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void closeIndexWriter(SolrCore core, boolean rollback) throws IOException {
|
||||
lock(iwLock.writeLock());
|
||||
changeWriter(core, rollback, false);
|
||||
// Do not unlock the writeLock in this method. It will be unlocked by the openIndexWriter call (see base class javadoc)
|
||||
}
|
||||
|
||||
@Override
|
||||
public void openIndexWriter(SolrCore core) throws IOException {
|
||||
try {
|
||||
changeWriter(core, false, true);
|
||||
} finally {
|
||||
iwLock.writeLock().unlock(); //unlock even if we failed
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void rollbackIndexWriter(SolrCore core) throws IOException {
|
||||
changeWriter(core, true, true);
|
||||
}
|
||||
|
||||
protected SolrIndexWriter createMainIndexWriter(SolrCore core, String name) throws IOException {
|
||||
|
|
|
@ -238,12 +238,12 @@ public class DirectUpdateHandler2 extends UpdateHandler implements SolrCoreState
|
|||
} else {
|
||||
writer.addDocument(cmd.getLuceneDocument());
|
||||
}
|
||||
if (ulog != null) ulog.add(cmd);
|
||||
|
||||
} finally {
|
||||
iw.decref();
|
||||
}
|
||||
synchronized (solrCoreState.getUpdateLock()) {
|
||||
if (ulog != null) ulog.add(cmd);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private void doNormalUpdate(AddUpdateCommand cmd) throws IOException {
|
||||
|
@ -278,20 +278,23 @@ public class DirectUpdateHandler2 extends UpdateHandler implements SolrCoreState
|
|||
bq.add(new BooleanClause(new TermQuery(idTerm), Occur.MUST));
|
||||
writer.deleteDocuments(new DeleteByQueryWrapper(bq.build(), core.getLatestSchema()));
|
||||
}
|
||||
|
||||
|
||||
// Add to the transaction log *after* successfully adding to the
|
||||
// index, if there was no error.
|
||||
// This ordering ensures that if we log it, it's definitely been
|
||||
// added to the the index.
|
||||
// This also ensures that if a commit sneaks in-between, that we
|
||||
// know everything in a particular
|
||||
// log version was definitely committed.
|
||||
if (ulog != null) ulog.add(cmd);
|
||||
|
||||
} finally {
|
||||
iw.decref();
|
||||
}
|
||||
|
||||
// Add to the transaction log *after* successfully adding to the
|
||||
// index, if there was no error.
|
||||
// This ordering ensures that if we log it, it's definitely been
|
||||
// added to the the index.
|
||||
// This also ensures that if a commit sneaks in-between, that we
|
||||
// know everything in a particular
|
||||
// log version was definitely committed.
|
||||
synchronized (solrCoreState.getUpdateLock()) {
|
||||
if (ulog != null) ulog.add(cmd);
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
||||
private void addAndDelete(AddUpdateCommand cmd, List<UpdateLog.DBQ> deletesAfter) throws IOException {
|
||||
|
@ -323,13 +326,12 @@ public class DirectUpdateHandler2 extends UpdateHandler implements SolrCoreState
|
|||
for (Query q : dbqList) {
|
||||
writer.deleteDocuments(new DeleteByQueryWrapper(q, core.getLatestSchema()));
|
||||
}
|
||||
if (ulog != null) ulog.add(cmd, true); // this needs to be protected by update lock
|
||||
}
|
||||
} finally {
|
||||
iw.decref();
|
||||
}
|
||||
synchronized (solrCoreState.getUpdateLock()) {
|
||||
if (ulog != null) ulog.add(cmd, true);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private void updateDeleteTrackers(DeleteUpdateCommand cmd) {
|
||||
|
@ -434,6 +436,9 @@ public class DirectUpdateHandler2 extends UpdateHandler implements SolrCoreState
|
|||
// part of a commit. DBQ needs to signal that a fresh reader will be needed for
|
||||
// a realtime view of the index. When a new searcher is opened after a DBQ, that
|
||||
// flag can be cleared. If those thing happen concurrently, it's not thread safe.
|
||||
// Also, ulog.deleteByQuery clears caches and is thus not safe to be called between
|
||||
// preSoftCommit/postSoftCommit and thus we use the updateLock to prevent this (just
|
||||
// as we use around ulog.preCommit... also see comments in ulog.postSoftCommit)
|
||||
//
|
||||
synchronized (solrCoreState.getUpdateLock()) {
|
||||
if (delAll) {
|
||||
|
@ -447,7 +452,7 @@ public class DirectUpdateHandler2 extends UpdateHandler implements SolrCoreState
|
|||
}
|
||||
}
|
||||
|
||||
if (ulog != null) ulog.deleteByQuery(cmd);
|
||||
if (ulog != null) ulog.deleteByQuery(cmd); // this needs to be protected by the update lock
|
||||
}
|
||||
|
||||
madeIt = true;
|
||||
|
|
|
@ -39,10 +39,10 @@ public abstract class SolrCoreState {
|
|||
public static Logger log = LoggerFactory.getLogger(SolrCoreState.class);
|
||||
|
||||
protected boolean closed = false;
|
||||
private final Object deleteLock = new Object();
|
||||
private final Object updateLock = new Object();
|
||||
|
||||
public Object getUpdateLock() {
|
||||
return deleteLock;
|
||||
return updateLock;
|
||||
}
|
||||
|
||||
private int solrCoreStateRefCnt = 1;
|
||||
|
|
Loading…
Reference in New Issue