LUCENE-3348: IndexWriter applies wrong deletes during concurrent flush-all

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1155278 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Simon Willnauer 2011-08-09 09:22:15 +00:00
parent fda091ef24
commit 78d186e081
14 changed files with 629 additions and 351 deletions

View File

@ -548,6 +548,10 @@ Bug fixes
lucene version, you get the old buggy behavior for backwards compatibility.
(Trejkaz, Robert Muir)
* LUCENE-3348: Fix thread safety hazards in IndexWriter that could
rarely cause deletions to be incorrectly applied. (Yonik Seeley,
Simon Willnauer, Mike McCandless)
New Features
* LUCENE-3290: Added FieldInvertState.numUniqueTerms

View File

@ -93,7 +93,7 @@ class BufferedDeletes {
} else {
String s = "gen=" + gen;
if (numTermDeletes.get() != 0) {
s += " " + numTermDeletes.get() + " deleted terms (unique count=" + terms.size() + ")";
s += " " + numTermDeletes.get() + " deleted terms (unique count=" + terms.size() + ") terms=" + terms.keySet();
}
if (queries.size() != 0) {
s += " " + queries.size() + " deleted queries";

View File

@ -372,7 +372,8 @@ class BufferedDeletesStream {
DocsEnum docs = null;
assert checkDeleteTerm(null);
//System.out.println(Thread.currentThread().getName() + " del terms reader=" + reader);
for (Term term : termsIter) {
// Since we visit terms sorted, we gain performance
// by re-using the same TermsEnum and seeking only
@ -401,6 +402,7 @@ class BufferedDeletesStream {
if (docsEnum != null) {
while (true) {
final int docID = docsEnum.nextDoc();
//System.out.println(Thread.currentThread().getName() + " del term=" + term + " doc=" + docID);
if (docID == DocsEnum.NO_MORE_DOCS) {
break;
}

View File

@ -309,7 +309,7 @@ class DirectoryReader extends IndexReader implements Cloneable {
buffer.append('(');
final String segmentsFile = segmentInfos.getCurrentSegmentFileName();
if (segmentsFile != null) {
buffer.append(segmentsFile);
buffer.append(segmentsFile).append(":").append(segmentInfos.getVersion());
}
if (writer != null) {
buffer.append(":nrt");

View File

@ -165,7 +165,7 @@ final class DocumentsWriter {
}
private void applyAllDeletes(DocumentsWriterDeleteQueue deleteQueue) throws IOException {
if (deleteQueue != null) {
if (deleteQueue != null && !flushControl.isFullFlush()) {
synchronized (ticketQueue) {
// Freeze and insert the delete flush ticket in the queue
ticketQueue.add(new FlushTicket(deleteQueue.freezeGlobalBuffer(null), false));
@ -220,7 +220,7 @@ final class DocumentsWriter {
try {
if (infoStream != null) {
message("docWriter: abort");
message("DW: abort");
}
final Iterator<ThreadState> threadsIterator = perThreadPool.getActivePerThreadsIterator();
@ -324,7 +324,7 @@ final class DocumentsWriter {
final Term delTerm) throws CorruptIndexException, IOException {
boolean maybeMerge = preUpdate();
final ThreadState perThread = perThreadPool.getAndLock(Thread.currentThread(), this);
final ThreadState perThread = flushControl.obtainAndLock();
final DocumentsWriterPerThread flushingDWPT;
try {
@ -356,7 +356,8 @@ final class DocumentsWriter {
boolean maybeMerge = preUpdate();
final ThreadState perThread = perThreadPool.getAndLock(Thread.currentThread(), this);
final ThreadState perThread = flushControl.obtainAndLock();
final DocumentsWriterPerThread flushingDWPT;
try {
@ -513,6 +514,9 @@ final class DocumentsWriter {
assert newSegment != null;
final SegmentInfo segInfo = indexWriter.prepareFlushedSegment(newSegment);
final BufferedDeletes deletes = newSegment.segmentDeletes;
if (infoStream != null) {
message(Thread.currentThread().getName() + ": publishFlushedSegment seg-private deletes=" + deletes);
}
FrozenBufferedDeletes packet = null;
if (deletes != null && deletes.any()) {
// Segment private delete
@ -542,7 +546,10 @@ final class DocumentsWriter {
final boolean flushAllThreads()
throws IOException {
final DocumentsWriterDeleteQueue flushingDeleteQueue;
if (infoStream != null) {
message(Thread.currentThread().getName() + " startFullFlush");
}
synchronized (this) {
flushingDeleteQueue = deleteQueue;
/* Cutover to a new delete queue. This must be synced on the flush control
@ -564,6 +571,9 @@ final class DocumentsWriter {
// If a concurrent flush is still in flight wait for it
flushControl.waitForFlush();
if (!anythingFlushed) { // apply deletes if we did not flush any document
if (infoStream != null) {
message(Thread.currentThread().getName() + ": flush naked frozen global deletes");
}
synchronized (ticketQueue) {
ticketQueue.add(new FlushTicket(flushingDeleteQueue.freezeGlobalBuffer(null), false));
}
@ -576,6 +586,9 @@ final class DocumentsWriter {
}
final void finishFullFlush(boolean success) {
if (infoStream != null) {
message(Thread.currentThread().getName() + " finishFullFlush success=" + success);
}
assert setFlushingDeleteQueue(null);
if (success) {
// Release the flush lock
@ -609,7 +622,7 @@ final class DocumentsWriter {
next.lock();
try {
assert !next.isActive();
} finally {
} finally {
next.unlock();
}
}

View File

@ -16,6 +16,8 @@ package org.apache.lucene.index;
* License for the specific language governing permissions and limitations under
* the License.
*/
import java.util.Arrays;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.concurrent.locks.ReentrantLock;
@ -110,6 +112,7 @@ final class DocumentsWriterDeleteQueue {
*/
void add(Term term, DeleteSlice slice) {
final TermNode termNode = new TermNode(term);
// System.out.println(Thread.currentThread().getName() + ": push " + termNode + " this=" + this);
add(termNode);
/*
* this is an update request where the term is the updated documents
@ -175,13 +178,14 @@ final class DocumentsWriterDeleteQueue {
void tryApplyGlobalSlice() {
if (globalBufferLock.tryLock()) {
/*
* The global buffer must be locked but we don't need to upate them if
* The global buffer must be locked but we don't need to update them if
* there is an update going on right now. It is sufficient to apply the
* deletes that have been added after the current in-flight global slices
* tail the next time we can get the lock!
*/
try {
if (updateSlice(globalSlice)) {
// System.out.println(Thread.currentThread() + ": apply globalSlice");
globalSlice.apply(globalBufferedDeletes, BufferedDeletes.MAX_INT);
}
} finally {
@ -210,6 +214,7 @@ final class DocumentsWriterDeleteQueue {
globalSlice.apply(globalBufferedDeletes, BufferedDeletes.MAX_INT);
}
// System.out.println(Thread.currentThread().getName() + ": now freeze global buffer " + globalBufferedDeletes);
final FrozenBufferedDeletes packet = new FrozenBufferedDeletes(
globalBufferedDeletes, false);
globalBufferedDeletes.clear();
@ -262,6 +267,7 @@ final class DocumentsWriterDeleteQueue {
current = current.next;
assert current != null : "slice property violated between the head on the tail must not be a null node";
current.apply(del, docIDUpto);
// System.out.println(Thread.currentThread().getName() + ": pull " + current + " docIDUpto=" + docIDUpto);
} while (current != sliceTail);
reset();
}
@ -330,6 +336,11 @@ final class DocumentsWriterDeleteQueue {
void apply(BufferedDeletes bufferedDeletes, int docIDUpto) {
bufferedDeletes.addTerm(item, docIDUpto);
}
@Override
public String toString() {
return "del=" + item;
}
}
private static final class QueryArrayNode extends Node<Query[]> {
@ -356,6 +367,11 @@ final class DocumentsWriterDeleteQueue {
bufferedDeletes.addTerm(term, docIDUpto);
}
}
@Override
public String toString() {
return "dels=" + Arrays.toString(item);
}
}

View File

@ -18,6 +18,7 @@ package org.apache.lucene.index;
*/
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
@ -362,10 +363,25 @@ public final class DocumentsWriterFlushControl {
return this.perThreadPool.getActiveThreadState();
}
ThreadState obtainAndLock() {
final ThreadState perThread = perThreadPool.getAndLock(Thread
.currentThread(), documentsWriter);
if (perThread.isActive()
&& perThread.perThread.deleteQueue != documentsWriter.deleteQueue) {
// There is a flush-all in process and this DWPT is
// now stale -- enroll it for flush and try for
// another DWPT:
addFlushableState(perThread);
}
// simply return the ThreadState even in a flush all case sine we already hold the lock
return perThread;
}
void markForFullFlush() {
final DocumentsWriterDeleteQueue flushingQueue;
synchronized (this) {
assert !fullFlush;
assert !fullFlush : "called DWFC#markForFullFlush() while full flush is still running";
assert fullFlushBuffer.isEmpty() : "full flush buffer should be empty: "+ fullFlushBuffer;
fullFlush = true;
flushingQueue = documentsWriter.deleteQueue;
// Set a new delete queue - all subsequent DWPT will use this queue until
@ -373,9 +389,7 @@ public final class DocumentsWriterFlushControl {
DocumentsWriterDeleteQueue newQueue = new DocumentsWriterDeleteQueue(flushingQueue.generation+1);
documentsWriter.deleteQueue = newQueue;
}
final Iterator<ThreadState> allActiveThreads = perThreadPool
.getActivePerThreadsIterator();
final ArrayList<DocumentsWriterPerThread> toFlush = new ArrayList<DocumentsWriterPerThread>();
final Iterator<ThreadState> allActiveThreads = perThreadPool.getActivePerThreadsIterator();
while (allActiveThreads.hasNext()) {
final ThreadState next = allActiveThreads.next();
next.lock();
@ -395,25 +409,7 @@ public final class DocumentsWriterFlushControl {
// this one is already a new DWPT
continue;
}
if (next.perThread.getNumDocsInRAM() > 0 ) {
final DocumentsWriterPerThread dwpt = next.perThread; // just for assert
synchronized (this) {
if (!next.flushPending) {
setFlushPending(next);
}
final DocumentsWriterPerThread flushingDWPT = internalTryCheckOutForFlush(next);
assert flushingDWPT != null : "DWPT must never be null here since we hold the lock and it holds documents";
assert dwpt == flushingDWPT : "flushControl returned different DWPT";
toFlush.add(flushingDWPT);
}
} else {
if (closed) {
next.resetWriter(null); // make this state inactive
} else {
// get the new delete queue from DW
next.perThread.initialize();
}
}
addFlushableState(next);
} finally {
next.unlock();
}
@ -425,9 +421,55 @@ public final class DocumentsWriterFlushControl {
* blocking indexing.*/
pruneBlockedQueue(flushingQueue);
assert assertBlockedFlushes(documentsWriter.deleteQueue);
flushQueue.addAll(toFlush);
flushQueue.addAll(fullFlushBuffer);
fullFlushBuffer.clear();
stallControl.updateStalled(this);
}
assert assertActiveDeleteQueue(documentsWriter.deleteQueue);
}
private boolean assertActiveDeleteQueue(DocumentsWriterDeleteQueue queue) {
final Iterator<ThreadState> allActiveThreads = perThreadPool.getActivePerThreadsIterator();
while (allActiveThreads.hasNext()) {
final ThreadState next = allActiveThreads.next();
next.lock();
try {
assert !next.isActive() || next.perThread.deleteQueue == queue;
} finally {
next.unlock();
}
}
return true;
}
private final List<DocumentsWriterPerThread> fullFlushBuffer = new ArrayList<DocumentsWriterPerThread>();
void addFlushableState(ThreadState perThread) {
if (documentsWriter.infoStream != null) {
documentsWriter.message("FC: " + Thread.currentThread().getName() + ": addFlushableState " + perThread.perThread);
}
final DocumentsWriterPerThread dwpt = perThread.perThread;
assert perThread.isHeldByCurrentThread();
assert perThread.isActive();
assert fullFlush;
assert dwpt.deleteQueue != documentsWriter.deleteQueue;
if (dwpt.getNumDocsInRAM() > 0) {
synchronized(this) {
if (!perThread.flushPending) {
setFlushPending(perThread);
}
final DocumentsWriterPerThread flushingDWPT = internalTryCheckOutForFlush(perThread);
assert flushingDWPT != null : "DWPT must never be null here since we hold the lock and it holds documents";
assert dwpt == flushingDWPT : "flushControl returned different DWPT";
fullFlushBuffer.add(flushingDWPT);
}
} else {
if (closed) {
perThread.resetWriter(null); // make this state inactive
} else {
dwpt.initialize();
}
}
}
/**
@ -502,7 +544,7 @@ public final class DocumentsWriterFlushControl {
/**
* Returns <code>true</code> if a full flush is currently running
*/
synchronized boolean isFullFlush() { // used by assert
synchronized boolean isFullFlush() {
return fullFlush;
}

View File

@ -47,7 +47,7 @@ public class DocumentsWriterPerThread {
abstract static class IndexingChain {
abstract DocConsumer getChain(DocumentsWriterPerThread documentsWriterPerThread);
}
static final IndexingChain defaultIndexingChain = new IndexingChain() {
@ -131,7 +131,7 @@ public class DocumentsWriterPerThread {
hasAborted = aborting = true;
try {
if (infoStream != null) {
message("docWriter: now abort");
message("now abort");
}
try {
consumer.abort();
@ -146,11 +146,11 @@ public class DocumentsWriterPerThread {
} finally {
aborting = false;
if (infoStream != null) {
message("docWriter: done abort");
message("done abort");
}
}
}
private final static boolean INFO_VERBOSE = false;
final DocumentsWriter parent;
final IndexWriter writer;
final Directory directory;
@ -223,8 +223,14 @@ public class DocumentsWriterPerThread {
// this call is synchronized on IndexWriter.segmentInfos
segment = writer.newSegmentName();
assert numDocsInRAM == 0;
if (INFO_VERBOSE) {
message(Thread.currentThread().getName() + " init seg=" + segment + " delQueue=" + deleteQueue);
}
}
if (INFO_VERBOSE) {
message(Thread.currentThread().getName() + " update delTerm=" + delTerm + " docID=" + docState.docID + " seg=" + segment);
}
boolean success = false;
try {
try {
@ -265,8 +271,13 @@ public class DocumentsWriterPerThread {
// this call is synchronized on IndexWriter.segmentInfos
segment = writer.newSegmentName();
assert numDocsInRAM == 0;
if (INFO_VERBOSE) {
message(Thread.currentThread().getName() + " init seg=" + segment + " delQueue=" + deleteQueue);
}
}
if (INFO_VERBOSE) {
message(Thread.currentThread().getName() + " update delTerm=" + delTerm + " docID=" + docState.docID + " seg=" + segment);
}
int docCount = 0;
try {
for(Document doc : docs) {
@ -552,4 +563,11 @@ public class DocumentsWriterPerThread {
this.infoStream = infoStream;
docState.infoStream = infoStream;
}
@Override
public String toString() {
return "DocumentsWriterPerThread [pendingDeletes=" + pendingDeletes
+ ", segment=" + segment + ", aborting=" + aborting + ", numDocsInRAM="
+ numDocsInRAM + ", deleteQueue=" + deleteQueue + "]";
}
}

View File

@ -447,7 +447,7 @@ final class IndexFileDeleter {
public void checkpoint(SegmentInfos segmentInfos, boolean isCommit) throws IOException {
if (infoStream != null) {
message("now checkpoint \"" + segmentInfos + "\" [" + segmentInfos.size() + " segments " + "; isCommit = " + isCommit + "]");
message("now checkpoint \"" + segmentInfos.toString(directory) + "\" [" + segmentInfos.size() + " segments " + "; isCommit = " + isCommit + "]");
}
// Try again now to delete any previously un-deletable

View File

@ -354,7 +354,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit {
poolReaders = true;
final IndexReader r;
doBeforeFlush();
final boolean anySegmentFlushed;
boolean anySegmentFlushed = false;
/*
* for releasing a NRT reader we must ensure that
* DW doesn't add any segments or deletes until we are
@ -382,9 +382,13 @@ public class IndexWriter implements Closeable, TwoPhaseCommit {
message("return reader version=" + r.getVersion() + " reader=" + r);
}
}
} catch (OutOfMemoryError oom) {
handleOOM(oom, "getReader");
// never reached but javac disagrees:
return null;
} finally {
if (!success && infoStream != null) {
message("hit exception during while NRT reader");
message("hit exception during NRT reader");
}
// Done: finish the full flush!
docWriter.finishFullFlush(success);
@ -2341,6 +2345,10 @@ public class IndexWriter implements Closeable, TwoPhaseCommit {
FrozenBufferedDeletes packet, FrozenBufferedDeletes globalPacket) throws IOException {
// Lock order IW -> BDS
synchronized (bufferedDeletesStream) {
if (infoStream != null) {
message("publishFlushedSegment");
}
if (globalPacket != null && globalPacket.any()) {
bufferedDeletesStream.push(globalPacket);
}
@ -2354,6 +2362,9 @@ public class IndexWriter implements Closeable, TwoPhaseCommit {
// generation right away
nextGen = bufferedDeletesStream.getNextGen();
}
if (infoStream != null) {
message("publish sets newSegment delGen=" + nextGen);
}
newSegment.setBufferedDeletesGen(nextGen);
segmentInfos.add(newSegment);
checkpoint();
@ -2710,19 +2721,82 @@ public class IndexWriter implements Closeable, TwoPhaseCommit {
*/
public final void prepareCommit(Map<String,String> commitUserData) throws CorruptIndexException, IOException {
if (infoStream != null) {
message("prepareCommit: flush");
message(" index before flush " + segString());
}
if (hitOOM) {
throw new IllegalStateException("this writer hit an OutOfMemoryError; cannot commit");
}
if (pendingCommit != null)
if (pendingCommit != null) {
throw new IllegalStateException("prepareCommit was already called with no corresponding call to commit");
}
if (infoStream != null)
message("prepareCommit: flush");
doBeforeFlush();
assert testPoint("startDoFlush");
SegmentInfos toCommit = null;
boolean anySegmentsFlushed = false;
flush(true, true);
// This is copied from doFlush, except it's modified to
// clone & incRef the flushed SegmentInfos inside the
// sync block:
startCommit(commitUserData);
try {
synchronized (fullFlushLock) {
boolean flushSuccess = false;
boolean success = false;
try {
anySegmentsFlushed = docWriter.flushAllThreads();
if (!anySegmentsFlushed) {
// prevent double increment since docWriter#doFlush increments the flushcount
// if we flushed anything.
flushCount.incrementAndGet();
}
flushSuccess = true;
synchronized(this) {
maybeApplyDeletes(true);
readerPool.commit(segmentInfos);
// Must clone the segmentInfos while we still
// hold fullFlushLock and while sync'd so that
// no partial changes (eg a delete w/o
// corresponding add from an updateDocument) can
// sneak into the commit point:
toCommit = (SegmentInfos) segmentInfos.clone();
pendingCommitChangeCount = changeCount;
// This protects the segmentInfos we are now going
// to commit. This is important in case, eg, while
// we are trying to sync all referenced files, a
// merge completes which would otherwise have
// removed the files we are now syncing.
deleter.incRef(toCommit, false);
}
success = true;
} finally {
if (!success && infoStream != null) {
message("hit exception during prepareCommit");
}
// Done: finish the full flush!
docWriter.finishFullFlush(flushSuccess);
doAfterFlush();
}
}
} catch (OutOfMemoryError oom) {
handleOOM(oom, "prepareCommit");
}
if (anySegmentsFlushed) {
maybeMerge();
}
startCommit(toCommit, commitUserData);
}
// Used only by commit, below; lock order is commitLock -> IW
@ -2913,13 +2987,12 @@ public class IndexWriter implements Closeable, TwoPhaseCommit {
} else if (infoStream != null) {
message("don't apply deletes now delTermCount=" + bufferedDeletesStream.numTerms() + " bytesUsed=" + bufferedDeletesStream.bytesUsed());
}
}
final synchronized void applyAllDeletes() throws IOException {
flushDeletesCount.incrementAndGet();
final BufferedDeletesStream.ApplyDeletesResult result = bufferedDeletesStream
.applyDeletes(readerPool, segmentInfos.asList());
final BufferedDeletesStream.ApplyDeletesResult result;
result = bufferedDeletesStream.applyDeletes(readerPool, segmentInfos.asList());
if (result.anyDeletes) {
checkpoint();
}
@ -3811,7 +3884,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit {
* if it wasn't already. If that succeeds, then we
* prepare a new segments_N file but do not fully commit
* it. */
private void startCommit(Map<String,String> commitUserData) throws IOException {
private void startCommit(final SegmentInfos toSync, final Map<String,String> commitUserData) throws IOException {
assert testPoint("startStartCommit");
assert pendingCommit == null;
@ -3822,44 +3895,31 @@ public class IndexWriter implements Closeable, TwoPhaseCommit {
try {
if (infoStream != null)
if (infoStream != null) {
message("startCommit(): start");
final SegmentInfos toSync;
final long myChangeCount;
}
synchronized(this) {
assert lastCommitChangeCount <= changeCount;
myChangeCount = changeCount;
if (changeCount == lastCommitChangeCount) {
if (infoStream != null)
if (pendingCommitChangeCount == lastCommitChangeCount) {
if (infoStream != null) {
message(" skip startCommit(): no changes pending");
}
deleter.decRef(toSync);
return;
}
// First, we clone & incref the segmentInfos we intend
// to sync, then, without locking, we sync() all files
// referenced by toSync, in the background.
if (infoStream != null)
message("startCommit index=" + segString(segmentInfos) + " changeCount=" + changeCount);
readerPool.commit(segmentInfos);
toSync = (SegmentInfos) segmentInfos.clone();
if (infoStream != null) {
message("startCommit index=" + segString(toSync) + " changeCount=" + changeCount);
}
assert filesExist(toSync);
if (commitUserData != null)
if (commitUserData != null) {
toSync.setUserData(commitUserData);
// This protects the segmentInfos we are now going
// to commit. This is important in case, eg, while
// we are trying to sync all referenced files, a
// merge completes which would otherwise have
// removed the files we are now syncing.
deleter.incRef(toSync, false);
}
}
assert testPoint("midStartCommit");
@ -3884,19 +3944,18 @@ public class IndexWriter implements Closeable, TwoPhaseCommit {
// an exception)
toSync.prepareCommit(directory);
pendingCommit = toSync;
pendingCommitSet = true;
pendingCommitChangeCount = myChangeCount;
pendingCommit = toSync;
}
if (infoStream != null)
if (infoStream != null) {
message("done all syncs");
}
assert testPoint("midStartCommitSuccess");
} finally {
synchronized(this) {
// Have our master segmentInfos record the
// generations we just prepared. We do this
// on error or success so we don't
@ -3908,6 +3967,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit {
message("hit exception committing segments file");
}
// Hit exception
deleter.decRef(toSync);
}
}

View File

@ -715,8 +715,14 @@ public final class SegmentInfo implements Cloneable {
if (getHasVectors()) {
s.append('v');
}
} catch (IOException e) {
throw new RuntimeException(e);
} catch (Throwable e) {
// Messy: because getHasVectors may be used in an
// un-thread-safe way, and may attempt to open an fnm
// file that has since (legitimately) been deleted by
// IndexWriter, instead of throwing these exceptions
// up, just add v? to indicate we don't know if this
// segment has vectors:
s.append("v?");
}
s.append(docCount);

View File

@ -492,7 +492,7 @@ public class TestIndexWriterOnDiskFull extends LuceneTestCase {
fail("fake disk full IOExceptions not hit");
} catch (IOException ioe) {
// expected
assertTrue(ftdm.didFail1);
assertTrue(ftdm.didFail1 || ftdm.didFail2);
}
_TestUtil.checkIndex(dir);
ftdm.clearDoFail();

View File

@ -0,0 +1,383 @@
package org.apache.lucene.index;
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.lucene.analysis.MockAnalyzer;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.ScoreDoc;
import org.apache.lucene.search.TermQuery;
import org.apache.lucene.search.TopDocs;
import org.apache.lucene.store.Directory;
import org.apache.lucene.util.LuceneTestCase;
import org.apache.lucene.util._TestUtil;
public class TestStressNRT extends LuceneTestCase {
volatile IndexReader reader;
final ConcurrentHashMap<Integer,Long> model = new ConcurrentHashMap<Integer,Long>();
Map<Integer,Long> committedModel = new HashMap<Integer,Long>();
long snapshotCount;
long committedModelClock;
volatile int lastId;
final String field = "val_l";
Object[] syncArr;
private void initModel(int ndocs) {
snapshotCount = 0;
committedModelClock = 0;
lastId = 0;
syncArr = new Object[ndocs];
for (int i=0; i<ndocs; i++) {
model.put(i, -1L);
syncArr[i] = new Object();
}
committedModel.putAll(model);
}
public void test() throws Exception {
// update variables
final int commitPercent = random.nextInt(20);
final int softCommitPercent = random.nextInt(100); // what percent of the commits are soft
final int deletePercent = random.nextInt(50);
final int deleteByQueryPercent = random.nextInt(25);
final int ndocs = atLeast(50);
final int nWriteThreads = _TestUtil.nextInt(random, 1, TEST_NIGHTLY ? 10 : 5);
final int maxConcurrentCommits = _TestUtil.nextInt(random, 1, TEST_NIGHTLY ? 10 : 5); // number of committers at a time... needed if we want to avoid commit errors due to exceeding the max
final boolean tombstones = random.nextBoolean();
// query variables
final AtomicLong operations = new AtomicLong(atLeast(50000)); // number of query operations to perform in total
final int nReadThreads = _TestUtil.nextInt(random, 1, TEST_NIGHTLY ? 10 : 5);
initModel(ndocs);
if (VERBOSE) {
System.out.println("\n");
System.out.println("TEST: commitPercent=" + commitPercent);
System.out.println("TEST: softCommitPercent=" + softCommitPercent);
System.out.println("TEST: deletePercent=" + deletePercent);
System.out.println("TEST: deleteByQueryPercent=" + deleteByQueryPercent);
System.out.println("TEST: ndocs=" + ndocs);
System.out.println("TEST: nWriteThreads=" + nWriteThreads);
System.out.println("TEST: nReadThreads=" + nReadThreads);
System.out.println("TEST: maxConcurrentCommits=" + maxConcurrentCommits);
System.out.println("TEST: tombstones=" + tombstones);
System.out.println("TEST: operations=" + operations);
System.out.println("\n");
}
final AtomicInteger numCommitting = new AtomicInteger();
List<Thread> threads = new ArrayList<Thread>();
Directory dir = newDirectory();
final RandomIndexWriter writer = new RandomIndexWriter(random, dir, newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random)));
writer.setDoRandomOptimizeAssert(false);
writer.w.setInfoStream(VERBOSE ? System.out : null);
writer.commit();
reader = IndexReader.open(dir);
for (int i=0; i<nWriteThreads; i++) {
Thread thread = new Thread("WRITER"+i) {
Random rand = new Random(random.nextInt());
@Override
public void run() {
try {
while (operations.get() > 0) {
int oper = rand.nextInt(100);
if (oper < commitPercent) {
if (numCommitting.incrementAndGet() <= maxConcurrentCommits) {
Map<Integer,Long> newCommittedModel;
long version;
IndexReader oldReader;
synchronized(TestStressNRT.this) {
newCommittedModel = new HashMap<Integer,Long>(model); // take a snapshot
version = snapshotCount++;
oldReader = reader;
oldReader.incRef(); // increment the reference since we will use this for reopening
}
IndexReader newReader;
if (rand.nextInt(100) < softCommitPercent) {
// assertU(h.commit("softCommit","true"));
if (random.nextBoolean()) {
if (VERBOSE) {
System.out.println("TEST: " + Thread.currentThread().getName() + ": call writer.getReader");
}
newReader = writer.getReader(true);
} else {
if (VERBOSE) {
System.out.println("TEST: " + Thread.currentThread().getName() + ": reopen reader=" + oldReader + " version=" + version);
}
newReader = oldReader.reopen(writer.w, true);
}
} else {
// assertU(commit());
if (VERBOSE) {
System.out.println("TEST: " + Thread.currentThread().getName() + ": commit+reopen reader=" + oldReader + " version=" + version);
}
writer.commit();
if (VERBOSE) {
System.out.println("TEST: " + Thread.currentThread().getName() + ": now reopen after commit");
}
newReader = oldReader.reopen();
}
// Code below assumes newReader comes w/
// extra ref:
if (newReader == oldReader) {
newReader.incRef();
}
oldReader.decRef();
synchronized(TestStressNRT.this) {
// install the new reader if it's newest (and check the current version since another reader may have already been installed)
//System.out.println(Thread.currentThread().getName() + ": newVersion=" + newReader.getVersion());
assert newReader.getRefCount() > 0;
assert reader.getRefCount() > 0;
if (newReader.getVersion() > reader.getVersion()) {
if (VERBOSE) {
System.out.println("TEST: " + Thread.currentThread().getName() + ": install new reader=" + newReader);
}
reader.decRef();
reader = newReader;
// Silly: forces fieldInfos to be
// loaded so we don't hit IOE on later
// reader.toString
newReader.toString();
// install this snapshot only if it's newer than the current one
if (version >= committedModelClock) {
if (VERBOSE) {
System.out.println("TEST: " + Thread.currentThread().getName() + ": install new model version=" + version);
}
committedModel = newCommittedModel;
committedModelClock = version;
} else {
if (VERBOSE) {
System.out.println("TEST: " + Thread.currentThread().getName() + ": skip install new model version=" + version);
}
}
} else {
// if the same reader, don't decRef.
if (VERBOSE) {
System.out.println("TEST: " + Thread.currentThread().getName() + ": skip install new reader=" + newReader);
}
newReader.decRef();
}
}
}
numCommitting.decrementAndGet();
} else {
int id = rand.nextInt(ndocs);
Object sync = syncArr[id];
// set the lastId before we actually change it sometimes to try and
// uncover more race conditions between writing and reading
boolean before = random.nextBoolean();
if (before) {
lastId = id;
}
// We can't concurrently update the same document and retain our invariants of increasing values
// since we can't guarantee what order the updates will be executed.
synchronized (sync) {
Long val = model.get(id);
long nextVal = Math.abs(val)+1;
if (oper < commitPercent + deletePercent) {
// assertU("<delete><id>" + id + "</id></delete>");
// add tombstone first
if (tombstones) {
Document d = new Document();
d.add(new Field("id","-"+Integer.toString(id), Field.Store.YES, Field.Index.NOT_ANALYZED_NO_NORMS));
d.add(new Field(field, Long.toString(nextVal), Field.Store.YES, Field.Index.NO));
writer.updateDocument(new Term("id", "-"+Integer.toString(id)), d);
}
if (VERBOSE) {
System.out.println("TEST: " + Thread.currentThread().getName() + ": term delDocs id:" + id + " nextVal=" + nextVal);
}
writer.deleteDocuments(new Term("id",Integer.toString(id)));
model.put(id, -nextVal);
} else if (oper < commitPercent + deletePercent + deleteByQueryPercent) {
//assertU("<delete><query>id:" + id + "</query></delete>");
// add tombstone first
if (tombstones) {
Document d = new Document();
d.add(new Field("id","-"+Integer.toString(id), Field.Store.YES, Field.Index.NOT_ANALYZED_NO_NORMS));
d.add(new Field(field, Long.toString(nextVal), Field.Store.YES, Field.Index.NO));
writer.updateDocument(new Term("id", "-"+Integer.toString(id)), d);
}
if (VERBOSE) {
System.out.println("TEST: " + Thread.currentThread().getName() + ": query delDocs id:" + id + " nextVal=" + nextVal);
}
writer.deleteDocuments(new TermQuery(new Term("id", Integer.toString(id))));
model.put(id, -nextVal);
} else {
// assertU(adoc("id",Integer.toString(id), field, Long.toString(nextVal)));
Document d = new Document();
d.add(newField("id",Integer.toString(id), Field.Store.YES, Field.Index.NOT_ANALYZED_NO_NORMS));
d.add(newField(field, Long.toString(nextVal), Field.Store.YES, Field.Index.NO));
if (VERBOSE) {
System.out.println("TEST: " + Thread.currentThread().getName() + ": u id:" + id + " val=" + nextVal);
}
writer.updateDocument(new Term("id", Integer.toString(id)), d);
if (tombstones) {
// remove tombstone after new addition (this should be optional?)
writer.deleteDocuments(new Term("id","-"+Integer.toString(id)));
}
model.put(id, nextVal);
}
}
if (!before) {
lastId = id;
}
}
}
} catch (Throwable e) {
System.out.println(Thread.currentThread().getName() + ": FAILED: unexpected exception");
e.printStackTrace(System.out);
throw new RuntimeException(e);
}
}
};
threads.add(thread);
}
for (int i=0; i<nReadThreads; i++) {
Thread thread = new Thread("READER"+i) {
Random rand = new Random(random.nextInt());
@Override
public void run() {
try {
while (operations.decrementAndGet() >= 0) {
// bias toward a recently changed doc
int id = rand.nextInt(100) < 25 ? lastId : rand.nextInt(ndocs);
// when indexing, we update the index, then the model
// so when querying, we should first check the model, and then the index
long val;
IndexReader r;
synchronized(TestStressNRT.this) {
val = committedModel.get(id);
r = reader;
r.incRef();
}
if (VERBOSE) {
System.out.println("TEST: " + Thread.currentThread().getName() + ": s id=" + id + " val=" + val + " r=" + r.getVersion());
}
// sreq = req("wt","json", "q","id:"+Integer.toString(id), "omitHeader","true");
IndexSearcher searcher = new IndexSearcher(r);
Query q = new TermQuery(new Term("id",Integer.toString(id)));
TopDocs results = searcher.search(q, 10);
if (results.totalHits == 0 && tombstones) {
// if we couldn't find the doc, look for its tombstone
q = new TermQuery(new Term("id","-"+Integer.toString(id)));
results = searcher.search(q, 1);
if (results.totalHits == 0) {
if (val == -1L) {
// expected... no doc was added yet
r.decRef();
continue;
}
fail("No documents or tombstones found for id " + id + ", expected at least " + val + " reader=" + r);
}
}
if (results.totalHits == 0 && !tombstones) {
// nothing to do - we can't tell anything from a deleted doc without tombstones
} else {
// we should have found the document, or its tombstone
if (results.totalHits != 1) {
System.out.println("FAIL: hits id:" + id + " val=" + val);
for(ScoreDoc sd : results.scoreDocs) {
final Document doc = r.document(sd.doc);
System.out.println(" docID=" + sd.doc + " id:" + doc.get("id") + " foundVal=" + doc.get(field));
}
fail("id=" + id + " reader=" + r + " totalHits=" + results.totalHits);
}
Document doc = searcher.doc(results.scoreDocs[0].doc);
long foundVal = Long.parseLong(doc.get(field));
if (foundVal < Math.abs(val)) {
fail("foundVal=" + foundVal + " val=" + val + " id=" + id + " reader=" + r);
}
}
r.decRef();
}
} catch (Throwable e) {
operations.set(-1L);
System.out.println(Thread.currentThread().getName() + ": FAILED: unexpected exception");
e.printStackTrace(System.out);
throw new RuntimeException(e);
}
}
};
threads.add(thread);
}
for (Thread thread : threads) {
thread.start();
}
for (Thread thread : threads) {
thread.join();
}
writer.close();
if (VERBOSE) {
System.out.println("TEST: close reader=" + reader);
}
reader.close();
dir.close();
}
}

View File

@ -16,26 +16,12 @@
*/
package org.apache.solr.search;
import org.apache.lucene.analysis.core.WhitespaceAnalyzer;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.TermQuery;
import org.apache.lucene.search.TopDocs;
import org.apache.lucene.store.RAMDirectory;
import org.apache.lucene.util.Version;
import org.apache.noggit.ObjectBuilder;
import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.common.SolrException;
import org.apache.solr.request.SolrQueryRequest;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.Ignore;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
@ -123,7 +109,7 @@ public class TestRealTimeGet extends SolrTestCaseJ4 {
// query variables
final int percentRealtimeQuery = 0; // realtime get is not implemented yet
final AtomicLong operations = new AtomicLong(0); // number of query operations to perform in total // TODO: once lucene level passes, we can move on to the solr level
final AtomicLong operations = new AtomicLong(1000); // number of query operations to perform in total // TODO: once lucene level passes, we can move on to the solr level
int nReadThreads = 10;
initModel(ndocs);
@ -272,257 +258,5 @@ public class TestRealTimeGet extends SolrTestCaseJ4 {
for (Thread thread : threads) {
thread.join();
}
}
IndexReader reader;
@Ignore
@Test
public void testStressLuceneNRT() throws Exception {
// update variables
final int commitPercent = 10;
final int softCommitPercent = 50; // what percent of the commits are soft
final int deletePercent = 8;
final int deleteByQueryPercent = 4;
final int ndocs = 100;
int nWriteThreads = 10;
final int maxConcurrentCommits = 2; // number of committers at a time... needed if we want to avoid commit errors due to exceeding the max
final boolean tombstones = false;
// query variables
final AtomicLong operations = new AtomicLong(0); // number of query operations to perform in total // TODO: temporarily high due to lack of stability
int nReadThreads = 10;
initModel(ndocs);
final AtomicInteger numCommitting = new AtomicInteger();
List<Thread> threads = new ArrayList<Thread>();
RAMDirectory dir = new RAMDirectory();
final IndexWriter writer = new IndexWriter(dir, new IndexWriterConfig(Version.LUCENE_40, new WhitespaceAnalyzer(Version.LUCENE_40)));
writer.commit();
reader = IndexReader.open(dir);
for (int i=0; i<nWriteThreads; i++) {
Thread thread = new Thread("WRITER"+i) {
Random rand = new Random(random.nextInt());
@Override
public void run() {
try {
while (operations.get() > 0) {
int oper = rand.nextInt(100);
if (oper < commitPercent) {
if (numCommitting.incrementAndGet() <= maxConcurrentCommits) {
Map<Integer,Long> newCommittedModel;
long version;
IndexReader oldReader;
synchronized(TestRealTimeGet.this) {
newCommittedModel = new HashMap<Integer,Long>(model); // take a snapshot
version = snapshotCount++;
oldReader = reader;
oldReader.incRef(); // increment the reference since we will use this for reopening
}
IndexReader newReader;
if (rand.nextInt(100) < softCommitPercent) {
// assertU(h.commit("softCommit","true"));
newReader = oldReader.reopen(writer, true);
} else {
// assertU(commit());
writer.commit();
newReader = oldReader.reopen();
}
synchronized(TestRealTimeGet.this) {
// install the new reader if it's newest (and check the current version since another reader may have already been installed)
if (newReader.getVersion() > reader.getVersion()) {
reader.decRef();
reader = newReader;
// install this snapshot only if it's newer than the current one
if (version >= committedModelClock) {
committedModel = newCommittedModel;
committedModelClock = version;
}
} else if (newReader != oldReader) {
// if the same reader, don't decRef.
newReader.decRef();
}
oldReader.decRef();
}
}
numCommitting.decrementAndGet();
continue;
}
int id = rand.nextInt(ndocs);
Object sync = syncArr[id];
// set the lastId before we actually change it sometimes to try and
// uncover more race conditions between writing and reading
boolean before = rand.nextBoolean();
if (before) {
lastId = id;
}
// We can't concurrently update the same document and retain our invariants of increasing values
// since we can't guarantee what order the updates will be executed.
synchronized (sync) {
Long val = model.get(id);
long nextVal = Math.abs(val)+1;
if (oper < commitPercent + deletePercent) {
// assertU("<delete><id>" + id + "</id></delete>");
// add tombstone first
if (tombstones) {
Document d = new Document();
d.add(new Field("id","-"+Integer.toString(id), Field.Store.YES, Field.Index.NOT_ANALYZED_NO_NORMS));
d.add(new Field(field, Long.toString(nextVal), Field.Store.YES, Field.Index.NO));
writer.updateDocument(new Term("id", "-"+Integer.toString(id)), d);
}
writer.deleteDocuments(new Term("id",Integer.toString(id)));
model.put(id, -nextVal);
} else if (oper < commitPercent + deletePercent + deleteByQueryPercent) {
//assertU("<delete><query>id:" + id + "</query></delete>");
// add tombstone first
if (tombstones) {
Document d = new Document();
d.add(new Field("id","-"+Integer.toString(id), Field.Store.YES, Field.Index.NOT_ANALYZED_NO_NORMS));
d.add(new Field(field, Long.toString(nextVal), Field.Store.YES, Field.Index.NO));
writer.updateDocument(new Term("id", "-"+Integer.toString(id)), d);
}
writer.deleteDocuments(new TermQuery(new Term("id", Integer.toString(id))));
model.put(id, -nextVal);
} else {
// assertU(adoc("id",Integer.toString(id), field, Long.toString(nextVal)));
Document d = new Document();
d.add(new Field("id",Integer.toString(id), Field.Store.YES, Field.Index.NOT_ANALYZED_NO_NORMS));
d.add(new Field(field, Long.toString(nextVal), Field.Store.YES, Field.Index.NO));
writer.updateDocument(new Term("id", Integer.toString(id)), d);
if (tombstones) {
// remove tombstone after new addition (this should be optional?)
writer.deleteDocuments(new Term("id","-"+Integer.toString(id)));
}
model.put(id, nextVal);
}
}
if (!before) {
lastId = id;
}
}
} catch (Exception ex) {
throw new RuntimeException(ex);
}
}
};
threads.add(thread);
}
for (int i=0; i<nReadThreads; i++) {
Thread thread = new Thread("READER"+i) {
Random rand = new Random(random.nextInt());
@Override
public void run() {
try {
while (operations.decrementAndGet() >= 0) {
// bias toward a recently changed doc
int id = rand.nextInt(100) < 25 ? lastId : rand.nextInt(ndocs);
// when indexing, we update the index, then the model
// so when querying, we should first check the model, and then the index
long val;
synchronized(TestRealTimeGet.this) {
val = committedModel.get(id);
}
IndexReader r;
synchronized(TestRealTimeGet.this) {
r = reader;
r.incRef();
}
// sreq = req("wt","json", "q","id:"+Integer.toString(id), "omitHeader","true");
IndexSearcher searcher = new IndexSearcher(r);
Query q = new TermQuery(new Term("id",Integer.toString(id)));
TopDocs results = searcher.search(q, 1);
if (results.totalHits == 0 && tombstones) {
// if we couldn't find the doc, look for it's tombstone
q = new TermQuery(new Term("id","-"+Integer.toString(id)));
results = searcher.search(q, 1);
if (results.totalHits == 0) {
if (val == -1L) {
// expected... no doc was added yet
continue;
}
fail("No documents or tombstones found for id " + id + ", expected at least " + val);
}
}
if (results.totalHits == 0 && !tombstones) {
// nothing to do - we can't tell anything from a deleted doc without tombstones
} else {
assertEquals(1, results.totalHits); // we should have found the document, or it's tombstone
Document doc = searcher.doc(results.scoreDocs[0].doc);
long foundVal = Long.parseLong(doc.get(field));
if (foundVal < Math.abs(val)) {
System.out.println("model_val="+val+" foundVal="+foundVal);
}
assertTrue(foundVal >= Math.abs(val));
}
r.decRef();
}
}
catch (Throwable e) {
operations.set(-1L);
SolrException.log(log,e);
fail(e.toString());
}
}
};
threads.add(thread);
}
for (Thread thread : threads) {
thread.start();
}
for (Thread thread : threads) {
thread.join();
}
writer.close();
reader.close();
}
}