SOLR-7836: Possible deadlock when closing refcounted index writers

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1694854 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Erick Erickson 2015-08-09 03:38:08 +00:00
parent 8958a1afff
commit 2676aa61b9
4 changed files with 432 additions and 188 deletions

View File

@ -115,6 +115,9 @@ Bug Fixes
* SOLR-7859: Fix usage of currentTimeMillis instead of nanoTime in multiple places,
whitelist valid uses of currentTimeMillis (Ramkumar Aiyengar)
* SOLR-7836: Possible deadlock when closing refcounted index writers.
(Jessica Cheng Mallet, Erick Erickson)
Optimizations
----------------------

View File

@ -100,27 +100,24 @@ public final class DefaultSolrCoreState extends SolrCoreState implements Recover
throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE, "Already closed");
}
}
if (core == null) {
// core == null is a signal to just return the current writer, or null
// if none.
initRefCntWriter();
if (refCntWriter == null) return null;
writerFree = false;
writerPauseLock.notifyAll();
if (refCntWriter != null) refCntWriter.incref();
return refCntWriter;
} else {
if (indexWriter == null) {
indexWriter = createMainIndexWriter(core, "DirectUpdateHandler2");
}
initRefCntWriter();
}
if (indexWriter == null) {
indexWriter = createMainIndexWriter(core, "DirectUpdateHandler2");
}
initRefCntWriter();
writerFree = false;
writerPauseLock.notifyAll();
refCntWriter.incref();
writerPauseLock.notifyAll();
return refCntWriter;
}
}
@ -152,18 +149,18 @@ public final class DefaultSolrCoreState extends SolrCoreState implements Recover
pauseWriter = true;
// then lets wait until it's out of use
log.info("Waiting until IndexWriter is unused... core=" + coreName);
while (!writerFree) {
try {
writerPauseLock.wait(100);
} catch (InterruptedException e) {}
if (closed) {
throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE, "SolrCoreState already closed");
}
}
try {
while (!writerFree) {
try {
writerPauseLock.wait(100);
} catch (InterruptedException e) {
}
if (closed) {
throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE, "SolrCoreState already closed");
}
}
if (indexWriter != null) {
if (!rollback) {
try {
@ -188,7 +185,6 @@ public final class DefaultSolrCoreState extends SolrCoreState implements Recover
// we need to null this so it picks up the new writer next get call
refCntWriter = null;
} finally {
pauseWriter = false;
writerPauseLock.notifyAll();
}
@ -210,39 +206,44 @@ public final class DefaultSolrCoreState extends SolrCoreState implements Recover
pauseWriter = true;
// then lets wait until it's out of use
log.info("Waiting until IndexWriter is unused... core=" + coreName);
while (!writerFree) {
try {
writerPauseLock.wait(100);
} catch (InterruptedException e) {}
if (closed) {
throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE,
"SolrCoreState already closed");
}
}
if (indexWriter != null) {
if (!rollback) {
try {
while (!writerFree) {
try {
log.info("Closing old IndexWriter... core=" + coreName);
indexWriter.close();
} catch (Exception e) {
SolrException.log(log, "Error closing old IndexWriter. core="
+ coreName, e);
writerPauseLock.wait(100);
} catch (InterruptedException e) {
}
} else {
try {
log.info("Rollback old IndexWriter... core=" + coreName);
indexWriter.rollback();
} catch (Exception e) {
SolrException.log(log, "Error rolling back old IndexWriter. core="
+ coreName, e);
if (closed) {
throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE,
"SolrCoreState already closed");
}
}
if (indexWriter != null) {
if (!rollback) {
try {
log.info("Closing old IndexWriter... core=" + coreName);
indexWriter.close();
} catch (Exception e) {
SolrException.log(log, "Error closing old IndexWriter. core="
+ coreName, e);
}
} else {
try {
log.info("Rollback old IndexWriter... core=" + coreName);
indexWriter.rollback();
} catch (Exception e) {
SolrException.log(log, "Error rolling back old IndexWriter. core="
+ coreName, e);
}
}
}
} finally {
pauseWriter = false;
writerPauseLock.notifyAll();
}
}
}
@Override

View File

@ -179,124 +179,154 @@ public class DirectUpdateHandler2 extends UpdateHandler implements SolrCoreState
*/
private int addDoc0(AddUpdateCommand cmd) throws IOException {
int rc = -1;
addCommands.incrementAndGet();
addCommandsCumulative.incrementAndGet();
// if there is no ID field, don't overwrite
if (idField == null) {
cmd.overwrite = false;
}
try {
if (cmd.overwrite) {
// Check for delete by query commands newer (i.e. reordered). This
// should always be null on a leader
List<UpdateLog.DBQ> deletesAfter = null;
if (ulog != null && cmd.version > 0) {
deletesAfter = ulog.getDBQNewer(cmd.version);
}
if (deletesAfter != null) {
addAndDelete(cmd, deletesAfter);
} else {
doNormalUpdate(cmd);
}
} else {
allowDuplicateUpdate(cmd);
}
if ((cmd.getFlags() & UpdateCommand.IGNORE_AUTOCOMMIT) == 0) {
if (commitWithinSoftCommit) {
commitTracker.addedDocument(-1);
softCommitTracker.addedDocument(cmd.commitWithin);
} else {
softCommitTracker.addedDocument(-1);
commitTracker.addedDocument(cmd.commitWithin);
}
}
rc = 1;
} finally {
if (rc != 1) {
numErrors.incrementAndGet();
numErrorsCumulative.incrementAndGet();
} else {
numDocsPending.incrementAndGet();
}
}
return rc;
}
private void allowDuplicateUpdate(AddUpdateCommand cmd) throws IOException {
RefCounted<IndexWriter> iw = solrCoreState.getIndexWriter(core);
try {
IndexWriter writer = iw.get();
addCommands.incrementAndGet();
addCommandsCumulative.incrementAndGet();
// if there is no ID field, don't overwrite
if (idField == null) {
cmd.overwrite = false;
}
try {
IndexSchema schema = cmd.getReq().getSchema();
if (cmd.overwrite) {
// Check for delete by query commands newer (i.e. reordered). This
// should always be null on a leader
List<UpdateLog.DBQ> deletesAfter = null;
if (ulog != null && cmd.version > 0) {
deletesAfter = ulog.getDBQNewer(cmd.version);
}
if (deletesAfter != null) {
log.info("Reordered DBQs detected. Update=" + cmd + " DBQs="
+ deletesAfter);
List<Query> dbqList = new ArrayList<>(deletesAfter.size());
for (UpdateLog.DBQ dbq : deletesAfter) {
try {
DeleteUpdateCommand tmpDel = new DeleteUpdateCommand(cmd.req);
tmpDel.query = dbq.q;
tmpDel.version = -dbq.version;
dbqList.add(getQuery(tmpDel));
} catch (Exception e) {
log.error("Exception parsing reordered query : " + dbq, e);
}
}
addAndDelete(cmd, dbqList);
} else {
// normal update
Term updateTerm;
Term idTerm = new Term(cmd.isBlock() ? "_root_" : idField.getName(), cmd.getIndexedId());
boolean del = false;
if (cmd.updateTerm == null) {
updateTerm = idTerm;
} else {
// this is only used by the dedup update processor
del = true;
updateTerm = cmd.updateTerm;
}
if (cmd.isBlock()) {
writer.updateDocuments(updateTerm, cmd);
} else {
Document luceneDocument = cmd.getLuceneDocument();
// SolrCore.verbose("updateDocument",updateTerm,luceneDocument,writer);
writer.updateDocument(updateTerm, luceneDocument);
}
// SolrCore.verbose("updateDocument",updateTerm,"DONE");
if (del) { // ensure id remains unique
BooleanQuery.Builder bq = new BooleanQuery.Builder();
bq.add(new BooleanClause(new TermQuery(updateTerm),
Occur.MUST_NOT));
bq.add(new BooleanClause(new TermQuery(idTerm), Occur.MUST));
writer.deleteDocuments(new DeleteByQueryWrapper(bq.build(), core.getLatestSchema()));
}
// Add to the transaction log *after* successfully adding to the
// index, if there was no error.
// This ordering ensures that if we log it, it's definitely been
// added to the the index.
// This also ensures that if a commit sneaks in-between, that we
// know everything in a particular
// log version was definitely committed.
if (ulog != null) ulog.add(cmd);
}
} else {
// allow duplicates
if (cmd.isBlock()) {
writer.addDocuments(cmd);
} else {
writer.addDocument(cmd.getLuceneDocument());
}
if (ulog != null) ulog.add(cmd);
}
if ((cmd.getFlags() & UpdateCommand.IGNORE_AUTOCOMMIT) == 0) {
if (commitWithinSoftCommit) {
commitTracker.addedDocument(-1);
softCommitTracker.addedDocument(cmd.commitWithin);
} else {
softCommitTracker.addedDocument(-1);
commitTracker.addedDocument(cmd.commitWithin);
}
}
rc = 1;
} finally {
if (rc != 1) {
numErrors.incrementAndGet();
numErrorsCumulative.incrementAndGet();
} else {
numDocsPending.incrementAndGet();
}
if (cmd.isBlock()) {
writer.addDocuments(cmd);
} else {
writer.addDocument(cmd.getLuceneDocument());
}
} finally {
iw.decref();
}
return rc;
if (ulog != null) ulog.add(cmd);
}
private void doNormalUpdate(AddUpdateCommand cmd) throws IOException {
Term updateTerm;
Term idTerm = new Term(cmd.isBlock() ? "_root_" : idField.getName(), cmd.getIndexedId());
boolean del = false;
if (cmd.updateTerm == null) {
updateTerm = idTerm;
} else {
// this is only used by the dedup update processor
del = true;
updateTerm = cmd.updateTerm;
}
RefCounted<IndexWriter> iw = solrCoreState.getIndexWriter(core);
try {
IndexWriter writer = iw.get();
if (cmd.isBlock()) {
writer.updateDocuments(updateTerm, cmd);
} else {
Document luceneDocument = cmd.getLuceneDocument();
// SolrCore.verbose("updateDocument",updateTerm,luceneDocument,writer);
writer.updateDocument(updateTerm, luceneDocument);
}
// SolrCore.verbose("updateDocument",updateTerm,"DONE");
if (del) { // ensure id remains unique
BooleanQuery.Builder bq = new BooleanQuery.Builder();
bq.add(new BooleanClause(new TermQuery(updateTerm),
Occur.MUST_NOT));
bq.add(new BooleanClause(new TermQuery(idTerm), Occur.MUST));
writer.deleteDocuments(new DeleteByQueryWrapper(bq.build(), core.getLatestSchema()));
}
} finally {
iw.decref();
}
// Add to the transaction log *after* successfully adding to the
// index, if there was no error.
// This ordering ensures that if we log it, it's definitely been
// added to the the index.
// This also ensures that if a commit sneaks in-between, that we
// know everything in a particular
// log version was definitely committed.
if (ulog != null) ulog.add(cmd);
}
private void addAndDelete(AddUpdateCommand cmd, List<UpdateLog.DBQ> deletesAfter) throws IOException {
log.info("Reordered DBQs detected. Update=" + cmd + " DBQs="
+ deletesAfter);
List<Query> dbqList = new ArrayList<>(deletesAfter.size());
for (UpdateLog.DBQ dbq : deletesAfter) {
try {
DeleteUpdateCommand tmpDel = new DeleteUpdateCommand(cmd.req);
tmpDel.query = dbq.q;
tmpDel.version = -dbq.version;
dbqList.add(getQuery(tmpDel));
} catch (Exception e) {
log.error("Exception parsing reordered query : " + dbq, e);
}
}
Document luceneDocument = cmd.getLuceneDocument();
Term idTerm = new Term(idField.getName(), cmd.getIndexedId());
RefCounted<IndexWriter> iw = solrCoreState.getIndexWriter(core);
try {
IndexWriter writer = iw.get();
// see comment in deleteByQuery
synchronized (solrCoreState.getUpdateLock()) {
writer.updateDocument(idTerm, luceneDocument);
for (Query q : dbqList) {
writer.deleteDocuments(new DeleteByQueryWrapper(q, core.getLatestSchema()));
}
}
} finally {
iw.decref();
}
if (ulog != null) ulog.add(cmd, true);
}
private void updateDeleteTrackers(DeleteUpdateCommand cmd) {
if ((cmd.getFlags() & UpdateCommand.IGNORE_AUTOCOMMIT) == 0) {
if (commitWithinSoftCommit) {
@ -428,35 +458,6 @@ public class DirectUpdateHandler2 extends UpdateHandler implements SolrCoreState
}
/** Add a document execute the deletes as atomically as possible */
private void addAndDelete(AddUpdateCommand cmd, List<Query> dbqList)
throws IOException {
Document luceneDocument = cmd.getLuceneDocument();
Term idTerm = new Term(idField.getName(), cmd.getIndexedId());
// see comment in deleteByQuery
synchronized (solrCoreState.getUpdateLock()) {
RefCounted<IndexWriter> iw = solrCoreState.getIndexWriter(core);
try {
IndexWriter writer = iw.get();
writer.updateDocument(idTerm, luceneDocument);
for (Query q : dbqList) {
writer.deleteDocuments(new DeleteByQueryWrapper(q, core.getLatestSchema()));
}
} finally {
iw.decref();
}
if (ulog != null) ulog.add(cmd, true);
}
}
@Override
public int mergeIndexes(MergeIndexesCommand cmd) throws IOException {
mergeIndexesCommands.incrementAndGet();

View File

@ -0,0 +1,239 @@
package org.apache.solr.search;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import com.carrotsearch.randomizedtesting.annotations.TimeoutSuite;
import org.apache.lucene.util.LuceneTestCase.Nightly;
import org.apache.lucene.util.TimeUnits;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import static org.apache.solr.update.processor.DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM;
// This test takes approx 30 seconds on a 2012 MacBook Pro running in IntelliJ. There should be a bunch of
// update threads dumped out all waiting on DefaultSolrCoreState.getIndexWriter,
// DistributedUpdateProcessor.versionAdd(DistributedUpdateProcessor.java:1016)
// and the like in a "real" failure. If we have false=fails we should probably bump this timeout.
// See SOLR-7836
@TimeoutSuite(millis = 5 * TimeUnits.MINUTE)
@Nightly
public class TestReloadDeadlock extends TestRTGBase {
public static Logger log = LoggerFactory.getLogger(TestStressReorder.class);
@BeforeClass
public static void beforeClass() throws Exception {
initCore("solrconfig-tlog.xml", "schema15.xml");
}
public static void ifVerbose(Object... args) {
if (VERBOSE) {
// if (!log.isDebugEnabled()) return;
StringBuilder sb = new StringBuilder("VERBOSE:");
for (Object o : args) {
sb.append(' ');
sb.append(o == null ? "(null)" : o.toString());
}
log.info(sb.toString());
}
}
@Test
public void testReloadDeadlock() throws Exception {
clearIndex();
assertU(commit());
final int commitPercent = 5 + random().nextInt(5);
final int deleteByQueryPercent = 20 + random().nextInt(20);
final int ndocs = 5 + (random().nextBoolean() ? random().nextInt(25) : random().nextInt(50));
int nWriteThreads = 5 + random().nextInt(10);
// query variables
final AtomicLong reloads = new AtomicLong(50); // number of reloads. Increase this number to force failure.
ifVerbose("commitPercent", commitPercent, "deleteByQueryPercent", deleteByQueryPercent
, "ndocs", ndocs, "nWriteThreads", nWriteThreads, "reloads", reloads);
initModel(ndocs);
final AtomicBoolean areCommitting = new AtomicBoolean();
List<Thread> threads = new ArrayList<>();
final AtomicLong testVersion = new AtomicLong(0);
for (int i = 0; i < nWriteThreads; i++) {
Thread thread = new Thread("WRITER" + i) {
Random rand = new Random(random().nextInt());
@Override
public void run() {
try {
while (reloads.get() > 0) {
int oper = rand.nextInt(100);
if (oper < commitPercent) {
if (areCommitting.compareAndSet(false, true)) {
Map<Integer, DocInfo> newCommittedModel;
long version;
synchronized (TestReloadDeadlock.this) {
newCommittedModel = new HashMap<>(model); // take a snapshot
version = snapshotCount++;
}
ifVerbose("hardCommit start");
assertU(commit());
ifVerbose("hardCommit end");
synchronized (TestReloadDeadlock.this) {
// install this model snapshot only if it's newer than the current one
if (version >= committedModelClock) {
ifVerbose("installing new committedModel version=" + committedModelClock);
committedModel = newCommittedModel;
committedModelClock = version;
}
}
areCommitting.set(false);
}
continue;
}
int id;
if (rand.nextBoolean()) {
id = rand.nextInt(ndocs);
} else {
id = lastId; // reuse the last ID half of the time to force more race conditions
}
// set the lastId before we actually change it sometimes to try and
// uncover more race conditions between writing and reading
boolean before = rand.nextBoolean();
if (before) {
lastId = id;
}
DocInfo info = model.get(id);
long val = info.val;
long nextVal = Math.abs(val) + 1;
long version = testVersion.incrementAndGet();
// yield after getting the next version to increase the odds of updates happening out of order
if (rand.nextBoolean()) Thread.yield();
if (oper < commitPercent + deleteByQueryPercent) {
deleteByQuery(id, nextVal, version);
} else {
addDoc(id, nextVal, version);
}
if (!before) {
lastId = id;
}
}
} catch (Throwable e) {
reloads.set(-1L);
log.error("", e);
throw new RuntimeException(e);
}
}
};
threads.add(thread);
}
for (Thread thread : threads) {
thread.start();
}
// The reload operation really doesn't need to happen from multiple threads, we just want it firing pretty often.
while (reloads.get() > 0) {
Thread.sleep(10 + random().nextInt(250));
reloads.decrementAndGet();
h.getCoreContainer().reload("collection1");
}
try {
for (Thread thread : threads) {
thread.join(10000); // Normally they'll all return immediately (or close to that).
}
} catch (InterruptedException ie) {
fail("Sholdn't have sat around here this long waiting for the threads to join.");
}
for (Thread thread : threads) { // Probably a silly test, but what the heck.
assertFalse("All threads shoul be dead, but at least thread " + thread.getName() + " is not", thread.isAlive());
}
}
private void addDoc(int id, long nextVal, long version) throws Exception {
ifVerbose("adding id", id, "val=", nextVal, "version", version);
Long returnedVersion = addAndGetVersion(sdoc("id", Integer.toString(id), field, Long.toString(nextVal),
"_version_", Long.toString(version)), params(DISTRIB_UPDATE_PARAM, FROM_LEADER));
if (returnedVersion != null) {
assertEquals(version, returnedVersion.longValue());
}
// only update model if the version is newer
synchronized (model) {
DocInfo currInfo = model.get(id);
if (version > currInfo.version) {
model.put(id, new DocInfo(version, nextVal));
}
}
ifVerbose("adding id", id, "val=", nextVal, "version", version, "DONE");
}
private void deleteByQuery(int id, long nextVal, long version) throws Exception {
ifVerbose("deleteByQuery id", id, "val=", nextVal, "version", version);
Long returnedVersion = deleteByQueryAndGetVersion("id:" + Integer.toString(id),
params("_version_", Long.toString(-version), DISTRIB_UPDATE_PARAM, FROM_LEADER));
// TODO: returning versions for these types of updates is redundant
// but if we do return, they had better be equal
if (returnedVersion != null) {
assertEquals(-version, returnedVersion.longValue());
}
// only update model if the version is newer
synchronized (model) {
DocInfo currInfo = model.get(id);
if (Math.abs(version) > Math.abs(currInfo.version)) {
model.put(id, new DocInfo(version, -nextVal));
}
}
ifVerbose("deleteByQuery id", id, "val=", nextVal, "version", version, "DONE");
}
}