initial sequence numbers patch

This commit is contained in:
Mike McCandless 2016-05-24 10:20:30 -04:00
parent 50c4f58276
commit 058970e72b
11 changed files with 437 additions and 76 deletions

View File

@ -34,7 +34,7 @@ import org.apache.lucene.util.RamUsageEstimator;
* single segment. This is used to hold buffered pending
* deletes and updates against the to-be-flushed segment. Once the
* deletes and updates are pushed (on flush in DocumentsWriter), they
* are converted to a FrozenDeletes instance. */
* are converted to a FrozenBufferedUpdates instance. */
// NOTE: instances of this class are accessed either via a private
// instance on DocumentWriterPerThread, or via sync'd code by

View File

@ -141,18 +141,22 @@ final class DocumentsWriter implements Closeable, Accountable {
final DocumentsWriterDeleteQueue deleteQueue = this.deleteQueue;
deleteQueue.addDelete(queries);
flushControl.doOnDelete();
// nocommit long
return applyAllDeletes(deleteQueue);
}
// TODO: we could check w/ FreqProxTermsWriter: if the
// term doesn't exist, don't bother buffering into the
// per-DWPT map (but still must go into the global map)
synchronized boolean deleteTerms(final Term... terms) throws IOException {
synchronized long deleteTerms(final Term... terms) throws IOException {
// TODO why is this synchronized?
final DocumentsWriterDeleteQueue deleteQueue = this.deleteQueue;
deleteQueue.addDelete(terms);
long seqNo = deleteQueue.addDelete(terms);
flushControl.doOnDelete();
return applyAllDeletes( deleteQueue);
if (applyAllDeletes(deleteQueue)) {
seqNo = -seqNo;
}
return seqNo;
}
synchronized boolean updateDocValues(DocValuesUpdate... updates) throws IOException {
@ -429,7 +433,7 @@ final class DocumentsWriter implements Closeable, Accountable {
return postUpdate(flushingDWPT, hasEvents);
}
boolean updateDocument(final Iterable<? extends IndexableField> doc, final Analyzer analyzer,
long updateDocument(final Iterable<? extends IndexableField> doc, final Analyzer analyzer,
final Term delTerm) throws IOException, AbortingException {
boolean hasEvents = preUpdate();
@ -437,6 +441,7 @@ final class DocumentsWriter implements Closeable, Accountable {
final ThreadState perThread = flushControl.obtainAndLock();
final DocumentsWriterPerThread flushingDWPT;
final long seqno;
try {
// This must happen after we've pulled the ThreadState because IW.close
// waits for all ThreadStates to be released:
@ -446,7 +451,7 @@ final class DocumentsWriter implements Closeable, Accountable {
final DocumentsWriterPerThread dwpt = perThread.dwpt;
final int dwptNumDocs = dwpt.getNumDocsInRAM();
try {
dwpt.updateDocument(doc, analyzer, delTerm);
seqno = dwpt.updateDocument(doc, analyzer, delTerm);
} catch (AbortingException ae) {
flushControl.doOnAbort(perThread);
dwpt.abort();
@ -463,7 +468,11 @@ final class DocumentsWriter implements Closeable, Accountable {
perThreadPool.release(perThread);
}
return postUpdate(flushingDWPT, hasEvents);
if (postUpdate(flushingDWPT, hasEvents)) {
return -seqno;
} else {
return seqno;
}
}
private boolean doFlush(DocumentsWriterPerThread flushingDWPT) throws IOException, AbortingException {
@ -587,20 +596,22 @@ final class DocumentsWriter implements Closeable, Accountable {
* two stage operation; the caller must ensure (in try/finally) that finishFlush
* is called after this method, to release the flush lock in DWFlushControl
*/
boolean flushAllThreads()
long flushAllThreads()
throws IOException, AbortingException {
final DocumentsWriterDeleteQueue flushingDeleteQueue;
if (infoStream.isEnabled("DW")) {
infoStream.message("DW", "startFullFlush");
}
long seqNo;
synchronized (this) {
pendingChangesInCurrentFullFlush = anyChanges();
flushingDeleteQueue = deleteQueue;
/* Cutover to a new delete queue. This must be synced on the flush control
* otherwise a new DWPT could sneak into the loop with an already flushing
* delete queue */
flushControl.markForFullFlush(); // swaps the delQueue synced on FlushControl
seqNo = flushControl.markForFullFlush(); // swaps the delQueue synced on FlushControl
assert setFlushingDeleteQueue(flushingDeleteQueue);
}
assert currentFullFlushDelQueue != null;
@ -620,13 +631,17 @@ final class DocumentsWriter implements Closeable, Accountable {
infoStream.message("DW", Thread.currentThread().getName() + ": flush naked frozen global deletes");
}
ticketQueue.addDeletes(flushingDeleteQueue);
}
}
ticketQueue.forcePurge(writer);
assert !flushingDeleteQueue.anyChanges() && !ticketQueue.hasTickets();
} finally {
assert flushingDeleteQueue == currentFullFlushDelQueue;
}
return anythingFlushed;
if (anythingFlushed) {
return -seqNo;
} else {
return seqNo;
}
}
void finishFullFlush(IndexWriter indexWriter, boolean success) {

View File

@ -17,6 +17,7 @@
package org.apache.lucene.index;
import java.util.Arrays;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.concurrent.locks.ReentrantLock;
@ -76,23 +77,28 @@ final class DocumentsWriterDeleteQueue implements Accountable {
private final DeleteSlice globalSlice;
private final BufferedUpdates globalBufferedUpdates;
private long gen;
// only acquired to update the global deletes, pkg-private for access by tests:
final ReentrantLock globalBufferLock = new ReentrantLock();
final long generation;
final AtomicLong seqNo;
DocumentsWriterDeleteQueue() {
this(0);
// seqNo must start at 1 because some APIs negate this to encode a boolean
this(0, 1);
}
DocumentsWriterDeleteQueue(long generation) {
this(new BufferedUpdates(), generation);
DocumentsWriterDeleteQueue(long generation, long startSeqNo) {
this(new BufferedUpdates(), generation, startSeqNo);
}
DocumentsWriterDeleteQueue(BufferedUpdates globalBufferedUpdates, long generation) {
DocumentsWriterDeleteQueue(BufferedUpdates globalBufferedUpdates, long generation, long startSeqNo) {
this.globalBufferedUpdates = globalBufferedUpdates;
this.generation = generation;
this.seqNo = new AtomicLong(startSeqNo);
/*
* we use a sentinel instance as our initial tail. No slice will ever try to
* apply this tail since the head is always omitted.
@ -101,28 +107,31 @@ final class DocumentsWriterDeleteQueue implements Accountable {
globalSlice = new DeleteSlice(tail);
}
void addDelete(Query... queries) {
add(new QueryArrayNode(queries));
long addDelete(Query... queries) {
long seqNo = add(new QueryArrayNode(queries));
tryApplyGlobalSlice();
return seqNo;
}
void addDelete(Term... terms) {
add(new TermArrayNode(terms));
long addDelete(Term... terms) {
long seqNo = add(new TermArrayNode(terms));
tryApplyGlobalSlice();
return seqNo;
}
void addDocValuesUpdates(DocValuesUpdate... updates) {
add(new DocValuesUpdatesNode(updates));
long addDocValuesUpdates(DocValuesUpdate... updates) {
long seqNo = add(new DocValuesUpdatesNode(updates));
tryApplyGlobalSlice();
return seqNo;
}
/**
* invariant for document update
*/
void add(Term term, DeleteSlice slice) {
long add(Term term, DeleteSlice slice) {
final TermNode termNode = new TermNode(term);
// System.out.println(Thread.currentThread().getName() + ": push " + termNode + " this=" + this);
add(termNode);
long seqNo = add(termNode);
/*
* this is an update request where the term is the updated documents
* delTerm. in that case we need to guarantee that this insert is atomic
@ -137,9 +146,12 @@ final class DocumentsWriterDeleteQueue implements Accountable {
assert slice.sliceHead != slice.sliceTail : "slice head and tail must differ after add";
tryApplyGlobalSlice(); // TODO doing this each time is not necessary maybe
// we can do it just every n times or so?
return seqNo;
}
void add(Node<?> item) {
// nocommit can we remove the sync'd
synchronized long add(Node<?> newNode) {
/*
* this non-blocking / 'wait-free' linked list add was inspired by Apache
* Harmony's ConcurrentLinkedQueue Implementation.
@ -157,18 +169,18 @@ final class DocumentsWriterDeleteQueue implements Accountable {
tailUpdater.compareAndSet(this, currentTail, tailNext); // can fail
} else {
/*
* we are in quiescent state and can try to insert the item to the
* we are in quiescent state and can try to insert the new node to the
* current tail if we fail to insert we just retry the operation since
* somebody else has already added its item
*/
if (currentTail.casNext(null, item)) {
if (currentTail.casNext(null, newNode)) {
/*
* now that we are done we need to advance the tail while another
* thread could have advanced it already so we can ignore the return
* type of this CAS call
*/
tailUpdater.compareAndSet(this, currentTail, item);
return;
tailUpdater.compareAndSet(this, currentTail, newNode);
return seqNo.getAndIncrement();
}
}
}
@ -230,8 +242,7 @@ final class DocumentsWriterDeleteQueue implements Accountable {
}
// System.out.println(Thread.currentThread().getName() + ": now freeze global buffer " + globalBufferedDeletes);
final FrozenBufferedUpdates packet = new FrozenBufferedUpdates(
globalBufferedUpdates, false);
final FrozenBufferedUpdates packet = new FrozenBufferedUpdates(globalBufferedUpdates, false);
globalBufferedUpdates.clear();
return packet;
} finally {

View File

@ -141,8 +141,7 @@ final class DocumentsWriterFlushControl implements Accountable {
}
private void commitPerThreadBytes(ThreadState perThread) {
final long delta = perThread.dwpt.bytesUsed()
- perThread.bytesUsed;
final long delta = perThread.dwpt.bytesUsed() - perThread.bytesUsed;
perThread.bytesUsed += delta;
/*
* We need to differentiate here if we are pending since setFlushPending
@ -167,8 +166,7 @@ final class DocumentsWriterFlushControl implements Accountable {
return true;
}
synchronized DocumentsWriterPerThread doAfterDocument(ThreadState perThread,
boolean isUpdate) {
synchronized DocumentsWriterPerThread doAfterDocument(ThreadState perThread, boolean isUpdate) {
try {
commitPerThreadBytes(perThread);
if (!perThread.flushPending) {
@ -471,8 +469,9 @@ final class DocumentsWriterFlushControl implements Accountable {
}
}
void markForFullFlush() {
long markForFullFlush() {
final DocumentsWriterDeleteQueue flushingQueue;
long seqNo;
synchronized (this) {
assert !fullFlush : "called DWFC#markForFullFlush() while full flush is still running";
assert fullFlushBuffer.isEmpty() : "full flush buffer should be empty: "+ fullFlushBuffer;
@ -480,7 +479,13 @@ final class DocumentsWriterFlushControl implements Accountable {
flushingQueue = documentsWriter.deleteQueue;
// Set a new delete queue - all subsequent DWPT will use this queue until
// we do another full flush
DocumentsWriterDeleteQueue newQueue = new DocumentsWriterDeleteQueue(flushingQueue.generation+1);
//System.out.println("DWFC: fullFLush old seqNo=" + documentsWriter.deleteQueue.seqNo.get() + " activeThreadCount=" + perThreadPool.getActiveThreadStateCount());
seqNo = documentsWriter.deleteQueue.seqNo.get() + perThreadPool.getActiveThreadStateCount();
// nocommit is this (active thread state count) always enough of a gap? what if new indexing thread sneaks in just now? it would
// have to get this next delete queue?
DocumentsWriterDeleteQueue newQueue = new DocumentsWriterDeleteQueue(flushingQueue.generation+1, seqNo+1);
documentsWriter.deleteQueue = newQueue;
}
final int limit = perThreadPool.getActiveThreadStateCount();
@ -520,6 +525,7 @@ final class DocumentsWriterFlushControl implements Accountable {
updateStallState();
}
assert assertActiveDeleteQueue(documentsWriter.deleteQueue);
return seqNo;
}
private boolean assertActiveDeleteQueue(DocumentsWriterDeleteQueue queue) {

View File

@ -175,7 +175,6 @@ class DocumentsWriterPerThread {
intBlockAllocator = new IntBlockAllocator(bytesUsed);
this.deleteQueue = deleteQueue;
assert numDocsInRAM == 0 : "num docs " + numDocsInRAM;
pendingUpdates.clear();
deleteSlice = deleteQueue.newSlice();
segmentInfo = new SegmentInfo(directoryOrig, Version.LATEST, segmentName, -1, false, codec, Collections.emptyMap(), StringHelper.randomId(), new HashMap<>(), null);
@ -210,7 +209,7 @@ class DocumentsWriterPerThread {
}
}
public void updateDocument(Iterable<? extends IndexableField> doc, Analyzer analyzer, Term delTerm) throws IOException, AbortingException {
public long updateDocument(Iterable<? extends IndexableField> doc, Analyzer analyzer, Term delTerm) throws IOException, AbortingException {
testPoint("DocumentsWriterPerThread addDocument start");
assert deleteQueue != null;
reserveOneDoc();
@ -241,7 +240,8 @@ class DocumentsWriterPerThread {
numDocsInRAM++;
}
}
finishDocument(delTerm);
return finishDocument(delTerm);
}
public int updateDocuments(Iterable<? extends Iterable<? extends IndexableField>> docs, Analyzer analyzer, Term delTerm) throws IOException, AbortingException {
@ -291,6 +291,8 @@ class DocumentsWriterPerThread {
deleteSlice.apply(pendingUpdates, numDocsInRAM-docCount);
}
// nocommit return seqNo here
} finally {
if (!allDocsIndexed && !aborted) {
// the iterator threw an exception that is not aborting
@ -308,7 +310,7 @@ class DocumentsWriterPerThread {
return docCount;
}
private void finishDocument(Term delTerm) {
private long finishDocument(Term delTerm) {
/*
* here we actually finish the document in two steps 1. push the delete into
* the queue and update our slice. 2. increment the DWPT private document
@ -318,11 +320,14 @@ class DocumentsWriterPerThread {
* since we updated the slice the last time.
*/
boolean applySlice = numDocsInRAM != 0;
long seqNo;
if (delTerm != null) {
deleteQueue.add(delTerm, deleteSlice);
seqNo = deleteQueue.add(delTerm, deleteSlice);
assert deleteSlice.isTailItem(delTerm) : "expected the delete term as the tail item";
} else {
applySlice &= deleteQueue.updateSlice(deleteSlice);
// nocommit we don't need to increment here?
seqNo = deleteQueue.seqNo.get();
}
if (applySlice) {
@ -331,6 +336,8 @@ class DocumentsWriterPerThread {
deleteSlice.reset();
}
++numDocsInRAM;
return seqNo;
}
// Buffer a specific docID for deletion. Currently only

View File

@ -229,19 +229,4 @@ final class DocumentsWriterPerThreadPool {
synchronized int getMaxThreadStates() {
return threadStates.size();
}
/**
* Returns the ThreadState with the minimum estimated number of threads
* waiting to acquire its lock or <code>null</code> if no {@link ThreadState}
* is yet visible to the calling thread.
*/
ThreadState minContendedThreadState() {
ThreadState minThreadState = null;
for (ThreadState state : threadStates) {
if (minThreadState == null || state.getQueueLength() < minThreadState.getQueueLength()) {
minThreadState = state;
}
}
return minThreadState;
}
}

View File

@ -266,6 +266,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
private List<SegmentCommitInfo> rollbackSegments; // list of segmentInfo we will fallback to if the commit fails
volatile SegmentInfos pendingCommit; // set when a commit is pending (after prepareCommit() & before commit())
volatile long pendingSeqNo;
volatile long pendingCommitChangeCount;
private Collection<String> filesToCommit;
@ -425,7 +426,14 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
boolean success = false;
synchronized (fullFlushLock) {
try {
anyChanges = docWriter.flushAllThreads();
// nocommit should we make this available in the returned NRT reader?
long seqNo = docWriter.flushAllThreads();
if (seqNo < 0) {
anyChanges = true;
seqNo = -seqNo;
} else {
anyChanges = false;
}
if (!anyChanges) {
// prevent double increment since docWriter#doFlush increments the flushcount
// if we flushed anything.
@ -1283,8 +1291,8 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
* @throws CorruptIndexException if the index is corrupt
* @throws IOException if there is a low-level IO error
*/
public void addDocument(Iterable<? extends IndexableField> doc) throws IOException {
updateDocument(null, doc);
public long addDocument(Iterable<? extends IndexableField> doc) throws IOException {
return updateDocument(null, doc);
}
/**
@ -1447,14 +1455,20 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
* @throws CorruptIndexException if the index is corrupt
* @throws IOException if there is a low-level IO error
*/
public void deleteDocuments(Term... terms) throws IOException {
public long deleteDocuments(Term... terms) throws IOException {
ensureOpen();
try {
if (docWriter.deleteTerms(terms)) {
long seqNo = docWriter.deleteTerms(terms);
if (seqNo < 0) {
seqNo = -seqNo;
processEvents(true, false);
}
return seqNo;
} catch (VirtualMachineError tragedy) {
tragicEvent(tragedy, "deleteDocuments(Term..)");
// dead code but javac disagrees:
return -1;
}
}
@ -1500,15 +1514,18 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
* @throws CorruptIndexException if the index is corrupt
* @throws IOException if there is a low-level IO error
*/
public void updateDocument(Term term, Iterable<? extends IndexableField> doc) throws IOException {
public long updateDocument(Term term, Iterable<? extends IndexableField> doc) throws IOException {
ensureOpen();
try {
boolean success = false;
try {
if (docWriter.updateDocument(doc, analyzer, term)) {
long seqNo = docWriter.updateDocument(doc, analyzer, term);
if (seqNo < 0) {
seqNo = - seqNo;
processEvents(true, false);
}
success = true;
return seqNo;
} finally {
if (!success) {
if (infoStream.isEnabled("IW")) {
@ -1518,6 +1535,9 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
}
} catch (AbortingException | VirtualMachineError tragedy) {
tragicEvent(tragedy, "updateDocument");
// dead code but javac disagrees:
return -1;
}
}
@ -2807,12 +2827,13 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
* will internally call prepareCommit.
*/
@Override
public final void prepareCommit() throws IOException {
public final long prepareCommit() throws IOException {
ensureOpen();
prepareCommitInternal(config.getMergePolicy());
pendingSeqNo = prepareCommitInternal(config.getMergePolicy());
return pendingSeqNo;
}
private void prepareCommitInternal(MergePolicy mergePolicy) throws IOException {
private long prepareCommitInternal(MergePolicy mergePolicy) throws IOException {
startCommitTime = System.nanoTime();
synchronized(commitLock) {
ensureOpen(false);
@ -2833,6 +2854,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
testPoint("startDoFlush");
SegmentInfos toCommit = null;
boolean anySegmentsFlushed = false;
long seqNo;
// This is copied from doFlush, except it's modified to
// clone & incRef the flushed SegmentInfos inside the
@ -2844,7 +2866,11 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
boolean flushSuccess = false;
boolean success = false;
try {
anySegmentsFlushed = docWriter.flushAllThreads();
seqNo = docWriter.flushAllThreads();
if (seqNo < 0) {
anySegmentsFlushed = true;
seqNo = -seqNo;
}
if (!anySegmentsFlushed) {
// prevent double increment since docWriter#doFlush increments the flushcount
// if we flushed anything.
@ -2898,6 +2924,9 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
}
} catch (AbortingException | VirtualMachineError tragedy) {
tragicEvent(tragedy, "prepareCommit");
// dead code but javac disagrees:
seqNo = -1;
}
boolean success = false;
@ -2907,6 +2936,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
}
startCommit(toCommit);
success = true;
return seqNo;
} finally {
if (!success) {
synchronized (this) {
@ -2983,9 +3013,10 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
* @see #prepareCommit
*/
@Override
public final void commit() throws IOException {
public final long commit() throws IOException {
ensureOpen();
commitInternal(config.getMergePolicy());
// nocommit should we put seq no into sis?
return commitInternal(config.getMergePolicy());
}
/** Returns true if there may be changes that have not been
@ -3001,7 +3032,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
return changeCount.get() != lastCommitChangeCount || docWriter.anyChanges() || bufferedUpdatesStream.any();
}
private final void commitInternal(MergePolicy mergePolicy) throws IOException {
private final long commitInternal(MergePolicy mergePolicy) throws IOException {
if (infoStream.isEnabled("IW")) {
infoStream.message("IW", "commit: start");
@ -3014,18 +3045,23 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
infoStream.message("IW", "commit: enter lock");
}
long seqNo;
if (pendingCommit == null) {
if (infoStream.isEnabled("IW")) {
infoStream.message("IW", "commit: now prepare");
}
prepareCommitInternal(mergePolicy);
seqNo = prepareCommitInternal(mergePolicy);
} else {
if (infoStream.isEnabled("IW")) {
infoStream.message("IW", "commit: already prepared");
}
seqNo = pendingSeqNo;
}
finishCommit();
return seqNo;
}
}
@ -3167,7 +3203,13 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
synchronized (fullFlushLock) {
boolean flushSuccess = false;
try {
anyChanges = docWriter.flushAllThreads();
long seqNo = docWriter.flushAllThreads();
if (seqNo < 0) {
seqNo = -seqNo;
anyChanges = true;
} else {
anyChanges = false;
}
if (!anyChanges) {
// flushCount is incremented in flushAllThreads
flushCount.incrementAndGet();

View File

@ -421,7 +421,7 @@ public final class StandardDirectoryReader extends DirectoryReader {
@Override
public String toString() {
return "DirectoryReader.ReaderCommit(" + segmentsFileName + ")";
return "StandardDirectoryReader.ReaderCommit(" + segmentsFileName + " files=" + files + ")";
}
@Override

View File

@ -37,6 +37,7 @@ import org.apache.lucene.store.Directory;
*
* @lucene.experimental */
// nocommit removeme
public class TrackingIndexWriter {
private final IndexWriter writer;
private final AtomicLong indexingGen = new AtomicLong(1);

View File

@ -34,7 +34,7 @@ public interface TwoPhaseCommit {
* 2-phase commit fails, {@link #rollback()} is called to discard all changes
* since last successful commit.
*/
public void prepareCommit() throws IOException;
public long prepareCommit() throws IOException;
/**
* The second phase of a 2-phase commit. Implementations should ideally do
@ -42,7 +42,7 @@ public interface TwoPhaseCommit {
* after it returns, the caller can assume that the changes were successfully
* committed to the underlying storage.
*/
public void commit() throws IOException;
public long commit() throws IOException;
/**
* Discards any changes that have occurred since the last commit. In a 2-phase
@ -50,6 +50,7 @@ public interface TwoPhaseCommit {
* {@link #prepareCommit()}, this method is used to roll all other objects
* back to their previous state.
*/
// nocommit return long?
public void rollback() throws IOException;
}

View File

@ -0,0 +1,293 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.lucene.index;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.StoredField;
import org.apache.lucene.document.StringField;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.TermQuery;
import org.apache.lucene.search.TopDocs;
import org.apache.lucene.store.Directory;
import org.apache.lucene.util.LuceneTestCase;
import org.apache.lucene.util.TestUtil;
public class TestIndexingSequenceNumbers extends LuceneTestCase {
public void testBasic() throws Exception {
Directory dir = newDirectory();
IndexWriter w = new IndexWriter(dir, newIndexWriterConfig());
long a = w.addDocument(new Document());
long b = w.addDocument(new Document());
assertTrue(b > a);
w.close();
dir.close();
}
public void testAfterRefresh() throws Exception {
Directory dir = newDirectory();
IndexWriter w = new IndexWriter(dir, newIndexWriterConfig());
long a = w.addDocument(new Document());
DirectoryReader.open(w).close();
long b = w.addDocument(new Document());
assertTrue(b > a);
w.close();
dir.close();
}
public void testAfterCommit() throws Exception {
Directory dir = newDirectory();
IndexWriter w = new IndexWriter(dir, newIndexWriterConfig());
long a = w.addDocument(new Document());
w.commit();
long b = w.addDocument(new Document());
assertTrue(b > a);
w.close();
dir.close();
}
public void testStressUpdateSameID() throws Exception {
int iters = atLeast(100);
for(int iter=0;iter<iters;iter++) {
Directory dir = newDirectory();
// nocommit use RandomIndexWriter
final IndexWriter w = new IndexWriter(dir, newIndexWriterConfig());
Thread[] threads = new Thread[TestUtil.nextInt(random(), 2, 5)];
final CountDownLatch startingGun = new CountDownLatch(1);
final long[] seqNos = new long[threads.length];
final Term id = new Term("id", "id");
// multiple threads update the same document
for(int i=0;i<threads.length;i++) {
final int threadID = i;
threads[i] = new Thread() {
@Override
public void run() {
try {
Document doc = new Document();
doc.add(new StoredField("thread", threadID));
doc.add(new StringField("id", "id", Field.Store.NO));
startingGun.await();
for(int j=0;j<100;j++) {
seqNos[threadID] = w.updateDocument(id, doc);
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
};
threads[i].start();
}
startingGun.countDown();
for(Thread thread : threads) {
thread.join();
}
// now confirm that the reported sequence numbers agree with the index:
int maxThread = 0;
Set<Long> allSeqNos = new HashSet<>();
for(int i=0;i<threads.length;i++) {
allSeqNos.add(seqNos[i]);
if (seqNos[i] > seqNos[maxThread]) {
maxThread = i;
}
}
// make sure all sequence numbers were different
assertEquals(threads.length, allSeqNos.size());
DirectoryReader r = DirectoryReader.open(w);
IndexSearcher s = newSearcher(r);
TopDocs hits = s.search(new TermQuery(id), 1);
assertEquals(1, hits.totalHits);
Document doc = r.document(hits.scoreDocs[0].doc);
assertEquals(maxThread, doc.getField("thread").numericValue().intValue());
r.close();
w.close();
dir.close();
}
}
static class Operation {
// 0 = update, 1 = delete, 2 = commit
byte what;
int id;
int threadID;
long seqNo;
}
public void testStressConcurrentCommit() throws Exception {
final int opCount = atLeast(10000);
final int idCount = TestUtil.nextInt(random(), 10, 1000);
Directory dir = newDirectory();
// nocommit use RandomIndexWriter
IndexWriterConfig iwc = newIndexWriterConfig();
iwc.setIndexDeletionPolicy(NoDeletionPolicy.INSTANCE);
final IndexWriter w = new IndexWriter(dir, iwc);
final int numThreads = TestUtil.nextInt(random(), 2, 5);
Thread[] threads = new Thread[numThreads];
//System.out.println("TEST: iter=" + iter + " opCount=" + opCount + " idCount=" + idCount + " threadCount=" + threads.length);
final CountDownLatch startingGun = new CountDownLatch(1);
List<List<Operation>> threadOps = new ArrayList<>();
Object commitLock = new Object();
final List<Operation> commits = new ArrayList<>();
final AtomicInteger opsSinceCommit = new AtomicInteger();
// multiple threads update the same set of documents, and we randomly commit
for(int i=0;i<threads.length;i++) {
final List<Operation> ops = new ArrayList<>();
threadOps.add(ops);
final int threadID = i;
threads[i] = new Thread() {
@Override
public void run() {
try {
startingGun.await();
for(int i=0;i<opCount;i++) {
Operation op = new Operation();
op.threadID = threadID;
if (random().nextInt(500) == 17) {
op.what = 2;
synchronized(commitLock) {
// nocommit why does this sometimes fail :)
//if (w.hasUncommittedChanges()) {
if (opsSinceCommit.get() > numThreads) {
op.seqNo = w.commit();
commits.add(op);
opsSinceCommit.set(0);
}
//System.out.println("done commit seqNo=" + op.seqNo);
}
} else {
op.id = random().nextInt(idCount);
Term idTerm = new Term("id", "" + op.id);
if (random().nextInt(10) == 1) {
op.what = 1;
op.seqNo = w.deleteDocuments(idTerm);
} else {
Document doc = new Document();
doc.add(new StoredField("thread", threadID));
doc.add(new StringField("id", "" + op.id, Field.Store.NO));
op.seqNo = w.updateDocument(idTerm, doc);
op.what = 2;
}
ops.add(op);
opsSinceCommit.getAndIncrement();
}
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
};
threads[i].start();
}
startingGun.countDown();
for(Thread thread : threads) {
thread.join();
}
Operation commitOp = new Operation();
synchronized(commitLock) {
commitOp.seqNo = w.commit();
commits.add(commitOp);
}
List<IndexCommit> indexCommits = DirectoryReader.listCommits(dir);
assertEquals(commits.size(), indexCommits.size());
int[] expectedThreadIDs = new int[idCount];
long[] seqNos = new long[idCount];
//System.out.println("TEST: " + commits.size() + " commits");
for(int i=0;i<commits.size();i++) {
// this commit point should reflect all operations <= this seqNo
long commitSeqNo = commits.get(i).seqNo;
//System.out.println(" commit " + i + ": seqNo=" + commitSeqNo + " segs=" + indexCommits.get(i));
Arrays.fill(expectedThreadIDs, -1);
Arrays.fill(seqNos, 0);
for(int threadID=0;threadID<threadOps.size();threadID++) {
long lastSeqNo = 0;
for(Operation op : threadOps.get(threadID)) {
if (op.seqNo <= commitSeqNo && op.seqNo > seqNos[op.id]) {
seqNos[op.id] = op.seqNo;
if (op.what == 2) {
expectedThreadIDs[op.id] = threadID;
} else {
expectedThreadIDs[op.id] = -1;
}
}
assertTrue(op.seqNo >= lastSeqNo);
lastSeqNo = op.seqNo;
}
}
DirectoryReader r = DirectoryReader.open(indexCommits.get(i));
IndexSearcher s = new IndexSearcher(r);
for(int id=0;id<idCount;id++) {
//System.out.println("TEST: check id=" + id + " expectedThreadID=" + expectedThreadIDs[id]);
TopDocs hits = s.search(new TermQuery(new Term("id", ""+id)), 1);
if (expectedThreadIDs[id] != -1) {
assertEquals(1, hits.totalHits);
Document doc = r.document(hits.scoreDocs[0].doc);
int actualThreadID = doc.getField("thread").numericValue().intValue();
if (expectedThreadIDs[id] != actualThreadID) {
System.out.println("FAIL: id=" + id + " expectedThreadID=" + expectedThreadIDs[id] + " vs actualThreadID=" + actualThreadID);
for(int threadID=0;threadID<threadOps.size();threadID++) {
for(Operation op : threadOps.get(threadID)) {
if (id == op.id) {
System.out.println(" threadID=" + threadID + " seqNo=" + op.seqNo + " " + (op.what == 2 ? "updated" : "deleted"));
}
}
}
assertEquals("id=" + id, expectedThreadIDs[id], actualThreadID);
}
} else if (hits.totalHits != 0) {
System.out.println("FAIL: id=" + id + " expectedThreadID=" + expectedThreadIDs[id] + " vs totalHits=" + hits.totalHits);
for(int threadID=0;threadID<threadOps.size();threadID++) {
for(Operation op : threadOps.get(threadID)) {
if (id == op.id) {
System.out.println(" threadID=" + threadID + " seqNo=" + op.seqNo + " " + (op.what == 2 ? "updated" : "del"));
}
}
}
assertEquals(0, hits.totalHits);
}
}
w.close();
r.close();
}
dir.close();
}
// nocommit test that does n ops across threads, then does it again with a single index / single thread, and assert indices are the same
}