From 2676aa61b98c11b9ba3dc5c3f9f19ab556d73495 Mon Sep 17 00:00:00 2001 From: Erick Erickson Date: Sun, 9 Aug 2015 03:38:08 +0000 Subject: [PATCH] SOLR-7836: Possible deadlock when closing refcounted index writers git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1694854 13f79535-47bb-0310-9956-ffa450edef68 --- solr/CHANGES.txt | 3 + .../solr/update/DefaultSolrCoreState.java | 101 +++---- .../solr/update/DirectUpdateHandler2.java | 277 +++++++++--------- .../solr/search/TestReloadDeadlock.java | 239 +++++++++++++++ 4 files changed, 432 insertions(+), 188 deletions(-) create mode 100644 solr/core/src/test/org/apache/solr/search/TestReloadDeadlock.java diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt index 6028719a3a6..c06a3572b95 100644 --- a/solr/CHANGES.txt +++ b/solr/CHANGES.txt @@ -115,6 +115,9 @@ Bug Fixes * SOLR-7859: Fix usage of currentTimeMillis instead of nanoTime in multiple places, whitelist valid uses of currentTimeMillis (Ramkumar Aiyengar) +* SOLR-7836: Possible deadlock when closing refcounted index writers. + (Jessica Cheng Mallet, Erick Erickson) + Optimizations ---------------------- diff --git a/solr/core/src/java/org/apache/solr/update/DefaultSolrCoreState.java b/solr/core/src/java/org/apache/solr/update/DefaultSolrCoreState.java index 60fe8e9f9e6..09289f92d40 100644 --- a/solr/core/src/java/org/apache/solr/update/DefaultSolrCoreState.java +++ b/solr/core/src/java/org/apache/solr/update/DefaultSolrCoreState.java @@ -100,27 +100,24 @@ public final class DefaultSolrCoreState extends SolrCoreState implements Recover throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE, "Already closed"); } } - + if (core == null) { // core == null is a signal to just return the current writer, or null // if none. initRefCntWriter(); if (refCntWriter == null) return null; - writerFree = false; - writerPauseLock.notifyAll(); - if (refCntWriter != null) refCntWriter.incref(); - - return refCntWriter; + } else { + if (indexWriter == null) { + indexWriter = createMainIndexWriter(core, "DirectUpdateHandler2"); + } + initRefCntWriter(); } - - if (indexWriter == null) { - indexWriter = createMainIndexWriter(core, "DirectUpdateHandler2"); - } - initRefCntWriter(); + writerFree = false; - writerPauseLock.notifyAll(); refCntWriter.incref(); + writerPauseLock.notifyAll(); return refCntWriter; + } } @@ -152,18 +149,18 @@ public final class DefaultSolrCoreState extends SolrCoreState implements Recover pauseWriter = true; // then lets wait until it's out of use log.info("Waiting until IndexWriter is unused... core=" + coreName); - - while (!writerFree) { - try { - writerPauseLock.wait(100); - } catch (InterruptedException e) {} - - if (closed) { - throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE, "SolrCoreState already closed"); - } - } try { + while (!writerFree) { + try { + writerPauseLock.wait(100); + } catch (InterruptedException e) { + } + if (closed) { + throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE, "SolrCoreState already closed"); + } + } + if (indexWriter != null) { if (!rollback) { try { @@ -188,7 +185,6 @@ public final class DefaultSolrCoreState extends SolrCoreState implements Recover // we need to null this so it picks up the new writer next get call refCntWriter = null; } finally { - pauseWriter = false; writerPauseLock.notifyAll(); } @@ -210,39 +206,44 @@ public final class DefaultSolrCoreState extends SolrCoreState implements Recover pauseWriter = true; // then lets wait until it's out of use log.info("Waiting until IndexWriter is unused... core=" + coreName); - - while (!writerFree) { - try { - writerPauseLock.wait(100); - } catch (InterruptedException e) {} - - if (closed) { - throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE, - "SolrCoreState already closed"); - } - } - - if (indexWriter != null) { - if (!rollback) { + try { + while (!writerFree) { try { - log.info("Closing old IndexWriter... core=" + coreName); - indexWriter.close(); - } catch (Exception e) { - SolrException.log(log, "Error closing old IndexWriter. core=" - + coreName, e); + writerPauseLock.wait(100); + } catch (InterruptedException e) { } - } else { - try { - log.info("Rollback old IndexWriter... core=" + coreName); - indexWriter.rollback(); - } catch (Exception e) { - SolrException.log(log, "Error rolling back old IndexWriter. core=" - + coreName, e); + + if (closed) { + throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE, + "SolrCoreState already closed"); } } + + if (indexWriter != null) { + if (!rollback) { + try { + log.info("Closing old IndexWriter... core=" + coreName); + indexWriter.close(); + } catch (Exception e) { + SolrException.log(log, "Error closing old IndexWriter. core=" + + coreName, e); + } + } else { + try { + log.info("Rollback old IndexWriter... core=" + coreName); + indexWriter.rollback(); + } catch (Exception e) { + SolrException.log(log, "Error rolling back old IndexWriter. core=" + + coreName, e); + } + } + } + } finally { + pauseWriter = false; + writerPauseLock.notifyAll(); } - } + } @Override diff --git a/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java b/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java index a57edab7bfe..cbdb2c492b0 100644 --- a/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java +++ b/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java @@ -179,124 +179,154 @@ public class DirectUpdateHandler2 extends UpdateHandler implements SolrCoreState */ private int addDoc0(AddUpdateCommand cmd) throws IOException { int rc = -1; + + addCommands.incrementAndGet(); + addCommandsCumulative.incrementAndGet(); + + // if there is no ID field, don't overwrite + if (idField == null) { + cmd.overwrite = false; + } + try { + if (cmd.overwrite) { + // Check for delete by query commands newer (i.e. reordered). This + // should always be null on a leader + List deletesAfter = null; + if (ulog != null && cmd.version > 0) { + deletesAfter = ulog.getDBQNewer(cmd.version); + } + + if (deletesAfter != null) { + addAndDelete(cmd, deletesAfter); + } else { + doNormalUpdate(cmd); + } + } else { + allowDuplicateUpdate(cmd); + } + + if ((cmd.getFlags() & UpdateCommand.IGNORE_AUTOCOMMIT) == 0) { + if (commitWithinSoftCommit) { + commitTracker.addedDocument(-1); + softCommitTracker.addedDocument(cmd.commitWithin); + } else { + softCommitTracker.addedDocument(-1); + commitTracker.addedDocument(cmd.commitWithin); + } + } + + rc = 1; + } finally { + if (rc != 1) { + numErrors.incrementAndGet(); + numErrorsCumulative.incrementAndGet(); + } else { + numDocsPending.incrementAndGet(); + } + } + + return rc; + } + + private void allowDuplicateUpdate(AddUpdateCommand cmd) throws IOException { RefCounted iw = solrCoreState.getIndexWriter(core); try { IndexWriter writer = iw.get(); - addCommands.incrementAndGet(); - addCommandsCumulative.incrementAndGet(); - - // if there is no ID field, don't overwrite - if (idField == null) { - cmd.overwrite = false; - } - - try { - IndexSchema schema = cmd.getReq().getSchema(); - - if (cmd.overwrite) { - - // Check for delete by query commands newer (i.e. reordered). This - // should always be null on a leader - List deletesAfter = null; - if (ulog != null && cmd.version > 0) { - deletesAfter = ulog.getDBQNewer(cmd.version); - } - - if (deletesAfter != null) { - log.info("Reordered DBQs detected. Update=" + cmd + " DBQs=" - + deletesAfter); - List dbqList = new ArrayList<>(deletesAfter.size()); - for (UpdateLog.DBQ dbq : deletesAfter) { - try { - DeleteUpdateCommand tmpDel = new DeleteUpdateCommand(cmd.req); - tmpDel.query = dbq.q; - tmpDel.version = -dbq.version; - dbqList.add(getQuery(tmpDel)); - } catch (Exception e) { - log.error("Exception parsing reordered query : " + dbq, e); - } - } - - addAndDelete(cmd, dbqList); - } else { - // normal update - - Term updateTerm; - Term idTerm = new Term(cmd.isBlock() ? "_root_" : idField.getName(), cmd.getIndexedId()); - boolean del = false; - if (cmd.updateTerm == null) { - updateTerm = idTerm; - } else { - // this is only used by the dedup update processor - del = true; - updateTerm = cmd.updateTerm; - } - if (cmd.isBlock()) { - writer.updateDocuments(updateTerm, cmd); - } else { - Document luceneDocument = cmd.getLuceneDocument(); - // SolrCore.verbose("updateDocument",updateTerm,luceneDocument,writer); - writer.updateDocument(updateTerm, luceneDocument); - } - // SolrCore.verbose("updateDocument",updateTerm,"DONE"); - - if (del) { // ensure id remains unique - BooleanQuery.Builder bq = new BooleanQuery.Builder(); - bq.add(new BooleanClause(new TermQuery(updateTerm), - Occur.MUST_NOT)); - bq.add(new BooleanClause(new TermQuery(idTerm), Occur.MUST)); - writer.deleteDocuments(new DeleteByQueryWrapper(bq.build(), core.getLatestSchema())); - } - - // Add to the transaction log *after* successfully adding to the - // index, if there was no error. - // This ordering ensures that if we log it, it's definitely been - // added to the the index. - // This also ensures that if a commit sneaks in-between, that we - // know everything in a particular - // log version was definitely committed. - if (ulog != null) ulog.add(cmd); - } - - } else { - // allow duplicates - if (cmd.isBlock()) { - writer.addDocuments(cmd); - } else { - writer.addDocument(cmd.getLuceneDocument()); - } - - if (ulog != null) ulog.add(cmd); - } - - if ((cmd.getFlags() & UpdateCommand.IGNORE_AUTOCOMMIT) == 0) { - if (commitWithinSoftCommit) { - commitTracker.addedDocument(-1); - softCommitTracker.addedDocument(cmd.commitWithin); - } else { - softCommitTracker.addedDocument(-1); - commitTracker.addedDocument(cmd.commitWithin); - } - } - - rc = 1; - } finally { - if (rc != 1) { - numErrors.incrementAndGet(); - numErrorsCumulative.incrementAndGet(); - } else { - numDocsPending.incrementAndGet(); - } + if (cmd.isBlock()) { + writer.addDocuments(cmd); + } else { + writer.addDocument(cmd.getLuceneDocument()); } - } finally { iw.decref(); } - - return rc; + + if (ulog != null) ulog.add(cmd); } - + + private void doNormalUpdate(AddUpdateCommand cmd) throws IOException { + Term updateTerm; + Term idTerm = new Term(cmd.isBlock() ? "_root_" : idField.getName(), cmd.getIndexedId()); + boolean del = false; + if (cmd.updateTerm == null) { + updateTerm = idTerm; + } else { + // this is only used by the dedup update processor + del = true; + updateTerm = cmd.updateTerm; + } + + RefCounted iw = solrCoreState.getIndexWriter(core); + try { + IndexWriter writer = iw.get(); + + if (cmd.isBlock()) { + writer.updateDocuments(updateTerm, cmd); + } else { + Document luceneDocument = cmd.getLuceneDocument(); + // SolrCore.verbose("updateDocument",updateTerm,luceneDocument,writer); + writer.updateDocument(updateTerm, luceneDocument); + } + // SolrCore.verbose("updateDocument",updateTerm,"DONE"); + + if (del) { // ensure id remains unique + BooleanQuery.Builder bq = new BooleanQuery.Builder(); + bq.add(new BooleanClause(new TermQuery(updateTerm), + Occur.MUST_NOT)); + bq.add(new BooleanClause(new TermQuery(idTerm), Occur.MUST)); + writer.deleteDocuments(new DeleteByQueryWrapper(bq.build(), core.getLatestSchema())); + } + } finally { + iw.decref(); + } + + // Add to the transaction log *after* successfully adding to the + // index, if there was no error. + // This ordering ensures that if we log it, it's definitely been + // added to the the index. + // This also ensures that if a commit sneaks in-between, that we + // know everything in a particular + // log version was definitely committed. + if (ulog != null) ulog.add(cmd); + } + + private void addAndDelete(AddUpdateCommand cmd, List deletesAfter) throws IOException { + + log.info("Reordered DBQs detected. Update=" + cmd + " DBQs=" + + deletesAfter); + List dbqList = new ArrayList<>(deletesAfter.size()); + for (UpdateLog.DBQ dbq : deletesAfter) { + try { + DeleteUpdateCommand tmpDel = new DeleteUpdateCommand(cmd.req); + tmpDel.query = dbq.q; + tmpDel.version = -dbq.version; + dbqList.add(getQuery(tmpDel)); + } catch (Exception e) { + log.error("Exception parsing reordered query : " + dbq, e); + } + } + + Document luceneDocument = cmd.getLuceneDocument(); + Term idTerm = new Term(idField.getName(), cmd.getIndexedId()); + + RefCounted iw = solrCoreState.getIndexWriter(core); + try { + IndexWriter writer = iw.get(); + + // see comment in deleteByQuery + synchronized (solrCoreState.getUpdateLock()) { + writer.updateDocument(idTerm, luceneDocument); + for (Query q : dbqList) { + writer.deleteDocuments(new DeleteByQueryWrapper(q, core.getLatestSchema())); + } + } + } finally { + iw.decref(); + } + if (ulog != null) ulog.add(cmd, true); + } + private void updateDeleteTrackers(DeleteUpdateCommand cmd) { if ((cmd.getFlags() & UpdateCommand.IGNORE_AUTOCOMMIT) == 0) { if (commitWithinSoftCommit) { @@ -428,35 +458,6 @@ public class DirectUpdateHandler2 extends UpdateHandler implements SolrCoreState } - - /** Add a document execute the deletes as atomically as possible */ - private void addAndDelete(AddUpdateCommand cmd, List dbqList) - throws IOException { - Document luceneDocument = cmd.getLuceneDocument(); - Term idTerm = new Term(idField.getName(), cmd.getIndexedId()); - - // see comment in deleteByQuery - synchronized (solrCoreState.getUpdateLock()) { - RefCounted iw = solrCoreState.getIndexWriter(core); - try { - IndexWriter writer = iw.get(); - writer.updateDocument(idTerm, luceneDocument); - - for (Query q : dbqList) { - writer.deleteDocuments(new DeleteByQueryWrapper(q, core.getLatestSchema())); - } - } finally { - iw.decref(); - } - - if (ulog != null) ulog.add(cmd, true); - } - - } - - - - @Override public int mergeIndexes(MergeIndexesCommand cmd) throws IOException { mergeIndexesCommands.incrementAndGet(); diff --git a/solr/core/src/test/org/apache/solr/search/TestReloadDeadlock.java b/solr/core/src/test/org/apache/solr/search/TestReloadDeadlock.java new file mode 100644 index 00000000000..02e1deb7a35 --- /dev/null +++ b/solr/core/src/test/org/apache/solr/search/TestReloadDeadlock.java @@ -0,0 +1,239 @@ +package org.apache.solr.search; + + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import com.carrotsearch.randomizedtesting.annotations.TimeoutSuite; +import org.apache.lucene.util.LuceneTestCase.Nightly; +import org.apache.lucene.util.TimeUnits; +import org.junit.BeforeClass; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; + +import static org.apache.solr.update.processor.DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM; + +// This test takes approx 30 seconds on a 2012 MacBook Pro running in IntelliJ. There should be a bunch of +// update threads dumped out all waiting on DefaultSolrCoreState.getIndexWriter, +// DistributedUpdateProcessor.versionAdd(DistributedUpdateProcessor.java:1016) +// and the like in a "real" failure. If we have false=fails we should probably bump this timeout. +// See SOLR-7836 +@TimeoutSuite(millis = 5 * TimeUnits.MINUTE) +@Nightly +public class TestReloadDeadlock extends TestRTGBase { + public static Logger log = LoggerFactory.getLogger(TestStressReorder.class); + + @BeforeClass + public static void beforeClass() throws Exception { + initCore("solrconfig-tlog.xml", "schema15.xml"); + } + + public static void ifVerbose(Object... args) { + if (VERBOSE) { + // if (!log.isDebugEnabled()) return; + StringBuilder sb = new StringBuilder("VERBOSE:"); + for (Object o : args) { + sb.append(' '); + sb.append(o == null ? "(null)" : o.toString()); + } + log.info(sb.toString()); + } + } + + @Test + public void testReloadDeadlock() throws Exception { + clearIndex(); + assertU(commit()); + + final int commitPercent = 5 + random().nextInt(5); + final int deleteByQueryPercent = 20 + random().nextInt(20); + final int ndocs = 5 + (random().nextBoolean() ? random().nextInt(25) : random().nextInt(50)); + int nWriteThreads = 5 + random().nextInt(10); + + // query variables + final AtomicLong reloads = new AtomicLong(50); // number of reloads. Increase this number to force failure. + + ifVerbose("commitPercent", commitPercent, "deleteByQueryPercent", deleteByQueryPercent + , "ndocs", ndocs, "nWriteThreads", nWriteThreads, "reloads", reloads); + + initModel(ndocs); + + final AtomicBoolean areCommitting = new AtomicBoolean(); + + List threads = new ArrayList<>(); + + final AtomicLong testVersion = new AtomicLong(0); + + for (int i = 0; i < nWriteThreads; i++) { + Thread thread = new Thread("WRITER" + i) { + Random rand = new Random(random().nextInt()); + + @Override + public void run() { + try { + while (reloads.get() > 0) { + int oper = rand.nextInt(100); + + if (oper < commitPercent) { + if (areCommitting.compareAndSet(false, true)) { + Map newCommittedModel; + long version; + + synchronized (TestReloadDeadlock.this) { + newCommittedModel = new HashMap<>(model); // take a snapshot + version = snapshotCount++; + } + + ifVerbose("hardCommit start"); + assertU(commit()); + ifVerbose("hardCommit end"); + + synchronized (TestReloadDeadlock.this) { + // install this model snapshot only if it's newer than the current one + if (version >= committedModelClock) { + ifVerbose("installing new committedModel version=" + committedModelClock); + committedModel = newCommittedModel; + committedModelClock = version; + } + } + areCommitting.set(false); + } + continue; + } + + + int id; + + if (rand.nextBoolean()) { + id = rand.nextInt(ndocs); + } else { + id = lastId; // reuse the last ID half of the time to force more race conditions + } + + // set the lastId before we actually change it sometimes to try and + // uncover more race conditions between writing and reading + boolean before = rand.nextBoolean(); + if (before) { + lastId = id; + } + + DocInfo info = model.get(id); + + long val = info.val; + long nextVal = Math.abs(val) + 1; + + long version = testVersion.incrementAndGet(); + + // yield after getting the next version to increase the odds of updates happening out of order + if (rand.nextBoolean()) Thread.yield(); + + if (oper < commitPercent + deleteByQueryPercent) { + deleteByQuery(id, nextVal, version); + } else { + addDoc(id, nextVal, version); + } + + if (!before) { + lastId = id; + } + } + } catch (Throwable e) { + reloads.set(-1L); + log.error("", e); + throw new RuntimeException(e); + } + } + }; + + threads.add(thread); + } + + for (Thread thread : threads) { + thread.start(); + } + + // The reload operation really doesn't need to happen from multiple threads, we just want it firing pretty often. + while (reloads.get() > 0) { + Thread.sleep(10 + random().nextInt(250)); + reloads.decrementAndGet(); + h.getCoreContainer().reload("collection1"); + } + + try { + for (Thread thread : threads) { + thread.join(10000); // Normally they'll all return immediately (or close to that). + } + } catch (InterruptedException ie) { + fail("Sholdn't have sat around here this long waiting for the threads to join."); + } + for (Thread thread : threads) { // Probably a silly test, but what the heck. + assertFalse("All threads shoul be dead, but at least thread " + thread.getName() + " is not", thread.isAlive()); + } + } + + private void addDoc(int id, long nextVal, long version) throws Exception { + ifVerbose("adding id", id, "val=", nextVal, "version", version); + + Long returnedVersion = addAndGetVersion(sdoc("id", Integer.toString(id), field, Long.toString(nextVal), + "_version_", Long.toString(version)), params(DISTRIB_UPDATE_PARAM, FROM_LEADER)); + if (returnedVersion != null) { + assertEquals(version, returnedVersion.longValue()); + } + + // only update model if the version is newer + synchronized (model) { + DocInfo currInfo = model.get(id); + if (version > currInfo.version) { + model.put(id, new DocInfo(version, nextVal)); + } + } + + ifVerbose("adding id", id, "val=", nextVal, "version", version, "DONE"); + } + + private void deleteByQuery(int id, long nextVal, long version) throws Exception { + ifVerbose("deleteByQuery id", id, "val=", nextVal, "version", version); + + Long returnedVersion = deleteByQueryAndGetVersion("id:" + Integer.toString(id), + params("_version_", Long.toString(-version), DISTRIB_UPDATE_PARAM, FROM_LEADER)); + + // TODO: returning versions for these types of updates is redundant + // but if we do return, they had better be equal + if (returnedVersion != null) { + assertEquals(-version, returnedVersion.longValue()); + } + + // only update model if the version is newer + synchronized (model) { + DocInfo currInfo = model.get(id); + if (Math.abs(version) > Math.abs(currInfo.version)) { + model.put(id, new DocInfo(version, -nextVal)); + } + } + + ifVerbose("deleteByQuery id", id, "val=", nextVal, "version", version, "DONE"); + } +}