LUCENE-7302: Merge branch 'sequence_numbers'

This commit is contained in:
Mike McCandless 2016-06-05 05:05:46 -04:00
commit b1fb142af0
23 changed files with 1111 additions and 417 deletions

View File

@ -30,6 +30,10 @@ New Features
applicable and supported when copying files from another FSDirectory in applicable and supported when copying files from another FSDirectory in
Directory#copyFrom. (Simon Willnauer) Directory#copyFrom. (Simon Willnauer)
* LUCENE-7302: IndexWriter methods that change the index now return a
long "sequence number" indicating the effective equivalent
single-threaded execution order (Mike McCandless)
API Changes API Changes
* LUCENE-7184: Refactor LatLonPoint encoding methods to new GeoEncodingUtils * LUCENE-7184: Refactor LatLonPoint encoding methods to new GeoEncodingUtils

View File

@ -34,7 +34,7 @@ import org.apache.lucene.util.RamUsageEstimator;
* single segment. This is used to hold buffered pending * single segment. This is used to hold buffered pending
* deletes and updates against the to-be-flushed segment. Once the * deletes and updates against the to-be-flushed segment. Once the
* deletes and updates are pushed (on flush in DocumentsWriter), they * deletes and updates are pushed (on flush in DocumentsWriter), they
* are converted to a FrozenDeletes instance. */ * are converted to a FrozenBufferedUpdates instance. */
// NOTE: instances of this class are accessed either via a private // NOTE: instances of this class are accessed either via a private
// instance on DocumentWriterPerThread, or via sync'd code by // instance on DocumentWriterPerThread, or via sync'd code by
@ -158,9 +158,12 @@ class BufferedUpdates {
private final static boolean VERBOSE_DELETES = false; private final static boolean VERBOSE_DELETES = false;
long gen; long gen;
final String segmentName;
public BufferedUpdates() { public BufferedUpdates(String segmentName) {
this.bytesUsed = new AtomicLong(); this.bytesUsed = new AtomicLong();
this.segmentName = segmentName;
} }
@Override @Override

View File

@ -136,30 +136,40 @@ final class DocumentsWriter implements Closeable, Accountable {
flushControl = new DocumentsWriterFlushControl(this, config, writer.bufferedUpdatesStream); flushControl = new DocumentsWriterFlushControl(this, config, writer.bufferedUpdatesStream);
} }
synchronized boolean deleteQueries(final Query... queries) throws IOException { synchronized long deleteQueries(final Query... queries) throws IOException {
// TODO why is this synchronized? // TODO why is this synchronized?
final DocumentsWriterDeleteQueue deleteQueue = this.deleteQueue; final DocumentsWriterDeleteQueue deleteQueue = this.deleteQueue;
deleteQueue.addDelete(queries); long seqNo = deleteQueue.addDelete(queries);
flushControl.doOnDelete(); flushControl.doOnDelete();
return applyAllDeletes(deleteQueue); if (applyAllDeletes(deleteQueue)) {
seqNo = -seqNo;
}
return seqNo;
} }
// TODO: we could check w/ FreqProxTermsWriter: if the // TODO: we could check w/ FreqProxTermsWriter: if the
// term doesn't exist, don't bother buffering into the // term doesn't exist, don't bother buffering into the
// per-DWPT map (but still must go into the global map) // per-DWPT map (but still must go into the global map)
synchronized boolean deleteTerms(final Term... terms) throws IOException { synchronized long deleteTerms(final Term... terms) throws IOException {
// TODO why is this synchronized? // TODO why is this synchronized?
final DocumentsWriterDeleteQueue deleteQueue = this.deleteQueue; final DocumentsWriterDeleteQueue deleteQueue = this.deleteQueue;
deleteQueue.addDelete(terms); long seqNo = deleteQueue.addDelete(terms);
flushControl.doOnDelete(); flushControl.doOnDelete();
return applyAllDeletes( deleteQueue); if (applyAllDeletes(deleteQueue)) {
seqNo = -seqNo;
}
return seqNo;
} }
synchronized boolean updateDocValues(DocValuesUpdate... updates) throws IOException { synchronized long updateDocValues(DocValuesUpdate... updates) throws IOException {
final DocumentsWriterDeleteQueue deleteQueue = this.deleteQueue; final DocumentsWriterDeleteQueue deleteQueue = this.deleteQueue;
deleteQueue.addDocValuesUpdates(updates); long seqNo = deleteQueue.addDocValuesUpdates(updates);
flushControl.doOnDelete(); flushControl.doOnDelete();
return applyAllDeletes(deleteQueue); if (applyAllDeletes(deleteQueue)) {
seqNo = -seqNo;
}
return seqNo;
} }
DocumentsWriterDeleteQueue currentDeleteSession() { DocumentsWriterDeleteQueue currentDeleteSession() {
@ -247,6 +257,10 @@ final class DocumentsWriter implements Closeable, Accountable {
abortedDocCount += abortThreadState(perThread); abortedDocCount += abortThreadState(perThread);
} }
deleteQueue.clear(); deleteQueue.clear();
// jump over any possible in flight ops:
deleteQueue.skipSequenceNumbers(perThreadPool.getActiveThreadStateCount()+1);
flushControl.abortPendingFlushes(); flushControl.abortPendingFlushes();
flushControl.waitForFlush(); flushControl.waitForFlush();
success = true; success = true;
@ -393,13 +407,14 @@ final class DocumentsWriter implements Closeable, Accountable {
} }
} }
boolean updateDocuments(final Iterable<? extends Iterable<? extends IndexableField>> docs, final Analyzer analyzer, long updateDocuments(final Iterable<? extends Iterable<? extends IndexableField>> docs, final Analyzer analyzer,
final Term delTerm) throws IOException, AbortingException { final Term delTerm) throws IOException, AbortingException {
boolean hasEvents = preUpdate(); boolean hasEvents = preUpdate();
final ThreadState perThread = flushControl.obtainAndLock(); final ThreadState perThread = flushControl.obtainAndLock();
final DocumentsWriterPerThread flushingDWPT; final DocumentsWriterPerThread flushingDWPT;
final long seqNo;
try { try {
// This must happen after we've pulled the ThreadState because IW.close // This must happen after we've pulled the ThreadState because IW.close
// waits for all ThreadStates to be released: // waits for all ThreadStates to be released:
@ -409,7 +424,7 @@ final class DocumentsWriter implements Closeable, Accountable {
final DocumentsWriterPerThread dwpt = perThread.dwpt; final DocumentsWriterPerThread dwpt = perThread.dwpt;
final int dwptNumDocs = dwpt.getNumDocsInRAM(); final int dwptNumDocs = dwpt.getNumDocsInRAM();
try { try {
dwpt.updateDocuments(docs, analyzer, delTerm); seqNo = dwpt.updateDocuments(docs, analyzer, delTerm);
} catch (AbortingException ae) { } catch (AbortingException ae) {
flushControl.doOnAbort(perThread); flushControl.doOnAbort(perThread);
dwpt.abort(); dwpt.abort();
@ -426,10 +441,14 @@ final class DocumentsWriter implements Closeable, Accountable {
perThreadPool.release(perThread); perThreadPool.release(perThread);
} }
return postUpdate(flushingDWPT, hasEvents); if (postUpdate(flushingDWPT, hasEvents)) {
return -seqNo;
} else {
return seqNo;
}
} }
boolean updateDocument(final Iterable<? extends IndexableField> doc, final Analyzer analyzer, long updateDocument(final Iterable<? extends IndexableField> doc, final Analyzer analyzer,
final Term delTerm) throws IOException, AbortingException { final Term delTerm) throws IOException, AbortingException {
boolean hasEvents = preUpdate(); boolean hasEvents = preUpdate();
@ -437,6 +456,7 @@ final class DocumentsWriter implements Closeable, Accountable {
final ThreadState perThread = flushControl.obtainAndLock(); final ThreadState perThread = flushControl.obtainAndLock();
final DocumentsWriterPerThread flushingDWPT; final DocumentsWriterPerThread flushingDWPT;
final long seqNo;
try { try {
// This must happen after we've pulled the ThreadState because IW.close // This must happen after we've pulled the ThreadState because IW.close
// waits for all ThreadStates to be released: // waits for all ThreadStates to be released:
@ -446,7 +466,7 @@ final class DocumentsWriter implements Closeable, Accountable {
final DocumentsWriterPerThread dwpt = perThread.dwpt; final DocumentsWriterPerThread dwpt = perThread.dwpt;
final int dwptNumDocs = dwpt.getNumDocsInRAM(); final int dwptNumDocs = dwpt.getNumDocsInRAM();
try { try {
dwpt.updateDocument(doc, analyzer, delTerm); seqNo = dwpt.updateDocument(doc, analyzer, delTerm);
} catch (AbortingException ae) { } catch (AbortingException ae) {
flushControl.doOnAbort(perThread); flushControl.doOnAbort(perThread);
dwpt.abort(); dwpt.abort();
@ -463,7 +483,11 @@ final class DocumentsWriter implements Closeable, Accountable {
perThreadPool.release(perThread); perThreadPool.release(perThread);
} }
return postUpdate(flushingDWPT, hasEvents); if (postUpdate(flushingDWPT, hasEvents)) {
return -seqNo;
} else {
return seqNo;
}
} }
private boolean doFlush(DocumentsWriterPerThread flushingDWPT) throws IOException, AbortingException { private boolean doFlush(DocumentsWriterPerThread flushingDWPT) throws IOException, AbortingException {
@ -587,20 +611,22 @@ final class DocumentsWriter implements Closeable, Accountable {
* two stage operation; the caller must ensure (in try/finally) that finishFlush * two stage operation; the caller must ensure (in try/finally) that finishFlush
* is called after this method, to release the flush lock in DWFlushControl * is called after this method, to release the flush lock in DWFlushControl
*/ */
boolean flushAllThreads() long flushAllThreads()
throws IOException, AbortingException { throws IOException, AbortingException {
final DocumentsWriterDeleteQueue flushingDeleteQueue; final DocumentsWriterDeleteQueue flushingDeleteQueue;
if (infoStream.isEnabled("DW")) { if (infoStream.isEnabled("DW")) {
infoStream.message("DW", "startFullFlush"); infoStream.message("DW", "startFullFlush");
} }
long seqNo;
synchronized (this) { synchronized (this) {
pendingChangesInCurrentFullFlush = anyChanges(); pendingChangesInCurrentFullFlush = anyChanges();
flushingDeleteQueue = deleteQueue; flushingDeleteQueue = deleteQueue;
/* Cutover to a new delete queue. This must be synced on the flush control /* Cutover to a new delete queue. This must be synced on the flush control
* otherwise a new DWPT could sneak into the loop with an already flushing * otherwise a new DWPT could sneak into the loop with an already flushing
* delete queue */ * delete queue */
flushControl.markForFullFlush(); // swaps the delQueue synced on FlushControl seqNo = flushControl.markForFullFlush(); // swaps this.deleteQueue synced on FlushControl
assert setFlushingDeleteQueue(flushingDeleteQueue); assert setFlushingDeleteQueue(flushingDeleteQueue);
} }
assert currentFullFlushDelQueue != null; assert currentFullFlushDelQueue != null;
@ -620,13 +646,17 @@ final class DocumentsWriter implements Closeable, Accountable {
infoStream.message("DW", Thread.currentThread().getName() + ": flush naked frozen global deletes"); infoStream.message("DW", Thread.currentThread().getName() + ": flush naked frozen global deletes");
} }
ticketQueue.addDeletes(flushingDeleteQueue); ticketQueue.addDeletes(flushingDeleteQueue);
} }
ticketQueue.forcePurge(writer); ticketQueue.forcePurge(writer);
assert !flushingDeleteQueue.anyChanges() && !ticketQueue.hasTickets(); assert !flushingDeleteQueue.anyChanges() && !ticketQueue.hasTickets();
} finally { } finally {
assert flushingDeleteQueue == currentFullFlushDelQueue; assert flushingDeleteQueue == currentFullFlushDelQueue;
} }
return anythingFlushed; if (anythingFlushed) {
return -seqNo;
} else {
return seqNo;
}
} }
void finishFullFlush(IndexWriter indexWriter, boolean success) { void finishFullFlush(IndexWriter indexWriter, boolean success) {
@ -646,7 +676,6 @@ final class DocumentsWriter implements Closeable, Accountable {
} finally { } finally {
pendingChangesInCurrentFullFlush = false; pendingChangesInCurrentFullFlush = false;
} }
} }
public LiveIndexWriterConfig getIndexWriterConfig() { public LiveIndexWriterConfig getIndexWriterConfig() {

View File

@ -17,6 +17,7 @@
package org.apache.lucene.index; package org.apache.lucene.index;
import java.util.Arrays; import java.util.Arrays;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
@ -68,12 +69,15 @@ import org.apache.lucene.util.BytesRef;
*/ */
final class DocumentsWriterDeleteQueue implements Accountable { final class DocumentsWriterDeleteQueue implements Accountable {
// the current end (latest delete operation) in the delete queue:
private volatile Node<?> tail; private volatile Node<?> tail;
@SuppressWarnings("rawtypes") @SuppressWarnings("rawtypes")
private static final AtomicReferenceFieldUpdater<DocumentsWriterDeleteQueue,Node> tailUpdater = AtomicReferenceFieldUpdater private static final AtomicReferenceFieldUpdater<DocumentsWriterDeleteQueue,Node> tailUpdater = AtomicReferenceFieldUpdater
.newUpdater(DocumentsWriterDeleteQueue.class, Node.class, "tail"); .newUpdater(DocumentsWriterDeleteQueue.class, Node.class, "tail");
/** Used to record deletes against all prior (already written to disk) segments. Whenever any segment flushes, we bundle up this set of
* deletes and insert into the buffered updates stream before the newly flushed segment(s). */
private final DeleteSlice globalSlice; private final DeleteSlice globalSlice;
private final BufferedUpdates globalBufferedUpdates; private final BufferedUpdates globalBufferedUpdates;
@ -81,18 +85,26 @@ final class DocumentsWriterDeleteQueue implements Accountable {
final ReentrantLock globalBufferLock = new ReentrantLock(); final ReentrantLock globalBufferLock = new ReentrantLock();
final long generation; final long generation;
/** Generates the sequence number that IW returns to callers changing the index, showing the effective serialization of all operations. */
private final AtomicLong nextSeqNo;
// for asserts
long maxSeqNo = Long.MAX_VALUE;
DocumentsWriterDeleteQueue() { DocumentsWriterDeleteQueue() {
this(0); // seqNo must start at 1 because some APIs negate this to also return a boolean
this(0, 1);
} }
DocumentsWriterDeleteQueue(long generation) { DocumentsWriterDeleteQueue(long generation, long startSeqNo) {
this(new BufferedUpdates(), generation); this(new BufferedUpdates("global"), generation, startSeqNo);
} }
DocumentsWriterDeleteQueue(BufferedUpdates globalBufferedUpdates, long generation) { DocumentsWriterDeleteQueue(BufferedUpdates globalBufferedUpdates, long generation, long startSeqNo) {
this.globalBufferedUpdates = globalBufferedUpdates; this.globalBufferedUpdates = globalBufferedUpdates;
this.generation = generation; this.generation = generation;
this.nextSeqNo = new AtomicLong(startSeqNo);
/* /*
* we use a sentinel instance as our initial tail. No slice will ever try to * we use a sentinel instance as our initial tail. No slice will ever try to
* apply this tail since the head is always omitted. * apply this tail since the head is always omitted.
@ -101,28 +113,30 @@ final class DocumentsWriterDeleteQueue implements Accountable {
globalSlice = new DeleteSlice(tail); globalSlice = new DeleteSlice(tail);
} }
void addDelete(Query... queries) { long addDelete(Query... queries) {
add(new QueryArrayNode(queries)); long seqNo = add(new QueryArrayNode(queries));
tryApplyGlobalSlice(); tryApplyGlobalSlice();
return seqNo;
} }
void addDelete(Term... terms) { long addDelete(Term... terms) {
add(new TermArrayNode(terms)); long seqNo = add(new TermArrayNode(terms));
tryApplyGlobalSlice(); tryApplyGlobalSlice();
return seqNo;
} }
void addDocValuesUpdates(DocValuesUpdate... updates) { long addDocValuesUpdates(DocValuesUpdate... updates) {
add(new DocValuesUpdatesNode(updates)); long seqNo = add(new DocValuesUpdatesNode(updates));
tryApplyGlobalSlice(); tryApplyGlobalSlice();
return seqNo;
} }
/** /**
* invariant for document update * invariant for document update
*/ */
void add(Term term, DeleteSlice slice) { long add(Term term, DeleteSlice slice) {
final TermNode termNode = new TermNode(term); final TermNode termNode = new TermNode(term);
// System.out.println(Thread.currentThread().getName() + ": push " + termNode + " this=" + this); long seqNo = add(termNode);
add(termNode);
/* /*
* this is an update request where the term is the updated documents * this is an update request where the term is the updated documents
* delTerm. in that case we need to guarantee that this insert is atomic * delTerm. in that case we need to guarantee that this insert is atomic
@ -137,42 +151,14 @@ final class DocumentsWriterDeleteQueue implements Accountable {
assert slice.sliceHead != slice.sliceTail : "slice head and tail must differ after add"; assert slice.sliceHead != slice.sliceTail : "slice head and tail must differ after add";
tryApplyGlobalSlice(); // TODO doing this each time is not necessary maybe tryApplyGlobalSlice(); // TODO doing this each time is not necessary maybe
// we can do it just every n times or so? // we can do it just every n times or so?
return seqNo;
} }
void add(Node<?> item) { synchronized long add(Node<?> newNode) {
/* tail.next = newNode;
* this non-blocking / 'wait-free' linked list add was inspired by Apache this.tail = newNode;
* Harmony's ConcurrentLinkedQueue Implementation. return getNextSequenceNumber();
*/
while (true) {
final Node<?> currentTail = this.tail;
final Node<?> tailNext = currentTail.next;
if (tail == currentTail) {
if (tailNext != null) {
/*
* we are in intermediate state here. the tails next pointer has been
* advanced but the tail itself might not be updated yet. help to
* advance the tail and try again updating it.
*/
tailUpdater.compareAndSet(this, currentTail, tailNext); // can fail
} else {
/*
* we are in quiescent state and can try to insert the item to the
* current tail if we fail to insert we just retry the operation since
* somebody else has already added its item
*/
if (currentTail.casNext(null, item)) {
/*
* now that we are done we need to advance the tail while another
* thread could have advanced it already so we can ignore the return
* type of this CAS call
*/
tailUpdater.compareAndSet(this, currentTail, item);
return;
}
}
}
}
} }
boolean anyChanges() { boolean anyChanges() {
@ -183,8 +169,7 @@ final class DocumentsWriterDeleteQueue implements Accountable {
* and if the global slice is up-to-date * and if the global slice is up-to-date
* and if globalBufferedUpdates has changes * and if globalBufferedUpdates has changes
*/ */
return globalBufferedUpdates.any() || !globalSlice.isEmpty() || globalSlice.sliceTail != tail return globalBufferedUpdates.any() || !globalSlice.isEmpty() || globalSlice.sliceTail != tail || tail.next != null;
|| tail.next != null;
} finally { } finally {
globalBufferLock.unlock(); globalBufferLock.unlock();
} }
@ -199,8 +184,7 @@ final class DocumentsWriterDeleteQueue implements Accountable {
* tail the next time we can get the lock! * tail the next time we can get the lock!
*/ */
try { try {
if (updateSlice(globalSlice)) { if (updateSliceNoSeqNo(globalSlice)) {
// System.out.println(Thread.currentThread() + ": apply globalSlice");
globalSlice.apply(globalBufferedUpdates, BufferedUpdates.MAX_INT); globalSlice.apply(globalBufferedUpdates, BufferedUpdates.MAX_INT);
} }
} finally { } finally {
@ -229,9 +213,7 @@ final class DocumentsWriterDeleteQueue implements Accountable {
globalSlice.apply(globalBufferedUpdates, BufferedUpdates.MAX_INT); globalSlice.apply(globalBufferedUpdates, BufferedUpdates.MAX_INT);
} }
// System.out.println(Thread.currentThread().getName() + ": now freeze global buffer " + globalBufferedDeletes); final FrozenBufferedUpdates packet = new FrozenBufferedUpdates(globalBufferedUpdates, false);
final FrozenBufferedUpdates packet = new FrozenBufferedUpdates(
globalBufferedUpdates, false);
globalBufferedUpdates.clear(); globalBufferedUpdates.clear();
return packet; return packet;
} finally { } finally {
@ -243,8 +225,21 @@ final class DocumentsWriterDeleteQueue implements Accountable {
return new DeleteSlice(tail); return new DeleteSlice(tail);
} }
boolean updateSlice(DeleteSlice slice) { /** Negative result means there were new deletes since we last applied */
if (slice.sliceTail != tail) { // If we are the same just synchronized long updateSlice(DeleteSlice slice) {
long seqNo = getNextSequenceNumber();
if (slice.sliceTail != tail) {
// new deletes arrived since we last checked
slice.sliceTail = tail;
seqNo = -seqNo;
}
return seqNo;
}
/** Just like updateSlice, but does not assign a sequence number */
boolean updateSliceNoSeqNo(DeleteSlice slice) {
if (slice.sliceTail != tail) {
// new deletes arrived since we last checked
slice.sliceTail = tail; slice.sliceTail = tail;
return true; return true;
} }
@ -282,7 +277,6 @@ final class DocumentsWriterDeleteQueue implements Accountable {
current = current.next; current = current.next;
assert current != null : "slice property violated between the head on the tail must not be a null node"; assert current != null : "slice property violated between the head on the tail must not be a null node";
current.apply(del, docIDUpto); current.apply(del, docIDUpto);
// System.out.println(Thread.currentThread().getName() + ": pull " + current + " docIDUpto=" + docIDUpto);
} while (current != sliceTail); } while (current != sliceTail);
reset(); reset();
} }
@ -459,6 +453,20 @@ final class DocumentsWriterDeleteQueue implements Accountable {
public String toString() { public String toString() {
return "DWDQ: [ generation: " + generation + " ]"; return "DWDQ: [ generation: " + generation + " ]";
} }
public long getNextSequenceNumber() {
long seqNo = nextSeqNo.getAndIncrement();
assert seqNo < maxSeqNo: "seqNo=" + seqNo + " vs maxSeqNo=" + maxSeqNo;
return seqNo;
}
public long getLastSequenceNumber() {
return nextSeqNo.get()-1;
}
/** Inserts a gap in the sequence numbers. This is used by IW during flush or commit to ensure any in-flight threads get sequence numbers
* inside the gap */
public void skipSequenceNumbers(long jump) {
nextSeqNo.addAndGet(jump);
}
} }

View File

@ -141,8 +141,7 @@ final class DocumentsWriterFlushControl implements Accountable {
} }
private void commitPerThreadBytes(ThreadState perThread) { private void commitPerThreadBytes(ThreadState perThread) {
final long delta = perThread.dwpt.bytesUsed() final long delta = perThread.dwpt.bytesUsed() - perThread.bytesUsed;
- perThread.bytesUsed;
perThread.bytesUsed += delta; perThread.bytesUsed += delta;
/* /*
* We need to differentiate here if we are pending since setFlushPending * We need to differentiate here if we are pending since setFlushPending
@ -167,8 +166,7 @@ final class DocumentsWriterFlushControl implements Accountable {
return true; return true;
} }
synchronized DocumentsWriterPerThread doAfterDocument(ThreadState perThread, synchronized DocumentsWriterPerThread doAfterDocument(ThreadState perThread, boolean isUpdate) {
boolean isUpdate) {
try { try {
commitPerThreadBytes(perThread); commitPerThreadBytes(perThread);
if (!perThread.flushPending) { if (!perThread.flushPending) {
@ -192,7 +190,7 @@ final class DocumentsWriterFlushControl implements Accountable {
flushingDWPT = null; flushingDWPT = null;
} }
} else { } else {
flushingDWPT = tryCheckoutForFlush(perThread); flushingDWPT = tryCheckoutForFlush(perThread);
} }
return flushingDWPT; return flushingDWPT;
} finally { } finally {
@ -454,8 +452,7 @@ final class DocumentsWriterFlushControl implements Accountable {
.currentThread(), documentsWriter); .currentThread(), documentsWriter);
boolean success = false; boolean success = false;
try { try {
if (perThread.isInitialized() if (perThread.isInitialized() && perThread.dwpt.deleteQueue != documentsWriter.deleteQueue) {
&& perThread.dwpt.deleteQueue != documentsWriter.deleteQueue) {
// There is a flush-all in process and this DWPT is // There is a flush-all in process and this DWPT is
// now stale -- enroll it for flush and try for // now stale -- enroll it for flush and try for
// another DWPT: // another DWPT:
@ -471,8 +468,9 @@ final class DocumentsWriterFlushControl implements Accountable {
} }
} }
void markForFullFlush() { long markForFullFlush() {
final DocumentsWriterDeleteQueue flushingQueue; final DocumentsWriterDeleteQueue flushingQueue;
long seqNo;
synchronized (this) { synchronized (this) {
assert !fullFlush : "called DWFC#markForFullFlush() while full flush is still running"; assert !fullFlush : "called DWFC#markForFullFlush() while full flush is still running";
assert fullFlushBuffer.isEmpty() : "full flush buffer should be empty: "+ fullFlushBuffer; assert fullFlushBuffer.isEmpty() : "full flush buffer should be empty: "+ fullFlushBuffer;
@ -480,7 +478,14 @@ final class DocumentsWriterFlushControl implements Accountable {
flushingQueue = documentsWriter.deleteQueue; flushingQueue = documentsWriter.deleteQueue;
// Set a new delete queue - all subsequent DWPT will use this queue until // Set a new delete queue - all subsequent DWPT will use this queue until
// we do another full flush // we do another full flush
DocumentsWriterDeleteQueue newQueue = new DocumentsWriterDeleteQueue(flushingQueue.generation+1);
// Insert a gap in seqNo of current active thread count, in the worst case each of those threads now have one operation in flight. It's fine
// if we have some sequence numbers that were never assigned:
seqNo = documentsWriter.deleteQueue.getLastSequenceNumber() + perThreadPool.getActiveThreadStateCount() + 2;
flushingQueue.maxSeqNo = seqNo+1;
DocumentsWriterDeleteQueue newQueue = new DocumentsWriterDeleteQueue(flushingQueue.generation+1, seqNo+1);
documentsWriter.deleteQueue = newQueue; documentsWriter.deleteQueue = newQueue;
} }
final int limit = perThreadPool.getActiveThreadStateCount(); final int limit = perThreadPool.getActiveThreadStateCount();
@ -520,6 +525,7 @@ final class DocumentsWriterFlushControl implements Accountable {
updateStallState(); updateStallState();
} }
assert assertActiveDeleteQueue(documentsWriter.deleteQueue); assert assertActiveDeleteQueue(documentsWriter.deleteQueue);
return seqNo;
} }
private boolean assertActiveDeleteQueue(DocumentsWriterDeleteQueue queue) { private boolean assertActiveDeleteQueue(DocumentsWriterDeleteQueue queue) {

View File

@ -171,11 +171,10 @@ class DocumentsWriterPerThread {
this.pendingNumDocs = pendingNumDocs; this.pendingNumDocs = pendingNumDocs;
bytesUsed = Counter.newCounter(); bytesUsed = Counter.newCounter();
byteBlockAllocator = new DirectTrackingAllocator(bytesUsed); byteBlockAllocator = new DirectTrackingAllocator(bytesUsed);
pendingUpdates = new BufferedUpdates(); pendingUpdates = new BufferedUpdates(segmentName);
intBlockAllocator = new IntBlockAllocator(bytesUsed); intBlockAllocator = new IntBlockAllocator(bytesUsed);
this.deleteQueue = deleteQueue; this.deleteQueue = deleteQueue;
assert numDocsInRAM == 0 : "num docs " + numDocsInRAM; assert numDocsInRAM == 0 : "num docs " + numDocsInRAM;
pendingUpdates.clear();
deleteSlice = deleteQueue.newSlice(); deleteSlice = deleteQueue.newSlice();
segmentInfo = new SegmentInfo(directoryOrig, Version.LATEST, segmentName, -1, false, codec, Collections.emptyMap(), StringHelper.randomId(), new HashMap<>(), null); segmentInfo = new SegmentInfo(directoryOrig, Version.LATEST, segmentName, -1, false, codec, Collections.emptyMap(), StringHelper.randomId(), new HashMap<>(), null);
@ -210,7 +209,7 @@ class DocumentsWriterPerThread {
} }
} }
public void updateDocument(Iterable<? extends IndexableField> doc, Analyzer analyzer, Term delTerm) throws IOException, AbortingException { public long updateDocument(Iterable<? extends IndexableField> doc, Analyzer analyzer, Term delTerm) throws IOException, AbortingException {
testPoint("DocumentsWriterPerThread addDocument start"); testPoint("DocumentsWriterPerThread addDocument start");
assert deleteQueue != null; assert deleteQueue != null;
reserveOneDoc(); reserveOneDoc();
@ -241,10 +240,11 @@ class DocumentsWriterPerThread {
numDocsInRAM++; numDocsInRAM++;
} }
} }
finishDocument(delTerm);
return finishDocument(delTerm);
} }
public int updateDocuments(Iterable<? extends Iterable<? extends IndexableField>> docs, Analyzer analyzer, Term delTerm) throws IOException, AbortingException { public long updateDocuments(Iterable<? extends Iterable<? extends IndexableField>> docs, Analyzer analyzer, Term delTerm) throws IOException, AbortingException {
testPoint("DocumentsWriterPerThread addDocuments start"); testPoint("DocumentsWriterPerThread addDocuments start");
assert deleteQueue != null; assert deleteQueue != null;
docState.analyzer = analyzer; docState.analyzer = analyzer;
@ -278,19 +278,32 @@ class DocumentsWriterPerThread {
numDocsInRAM++; numDocsInRAM++;
} }
} }
finishDocument(null);
numDocsInRAM++;
} }
allDocsIndexed = true; allDocsIndexed = true;
// Apply delTerm only after all indexing has // Apply delTerm only after all indexing has
// succeeded, but apply it only to docs prior to when // succeeded, but apply it only to docs prior to when
// this batch started: // this batch started:
long seqNo;
if (delTerm != null) { if (delTerm != null) {
deleteQueue.add(delTerm, deleteSlice); seqNo = deleteQueue.add(delTerm, deleteSlice);
assert deleteSlice.isTailItem(delTerm) : "expected the delete term as the tail item"; assert deleteSlice.isTailItem(delTerm) : "expected the delete term as the tail item";
deleteSlice.apply(pendingUpdates, numDocsInRAM-docCount); deleteSlice.apply(pendingUpdates, numDocsInRAM-docCount);
return seqNo;
} else {
seqNo = deleteQueue.updateSlice(deleteSlice);
if (seqNo < 0) {
seqNo = -seqNo;
deleteSlice.apply(pendingUpdates, numDocsInRAM-docCount);
} else {
deleteSlice.reset();
}
} }
return seqNo;
} finally { } finally {
if (!allDocsIndexed && !aborted) { if (!allDocsIndexed && !aborted) {
// the iterator threw an exception that is not aborting // the iterator threw an exception that is not aborting
@ -304,11 +317,9 @@ class DocumentsWriterPerThread {
} }
docState.clear(); docState.clear();
} }
return docCount;
} }
private void finishDocument(Term delTerm) { private long finishDocument(Term delTerm) {
/* /*
* here we actually finish the document in two steps 1. push the delete into * here we actually finish the document in two steps 1. push the delete into
* the queue and update our slice. 2. increment the DWPT private document * the queue and update our slice. 2. increment the DWPT private document
@ -318,11 +329,18 @@ class DocumentsWriterPerThread {
* since we updated the slice the last time. * since we updated the slice the last time.
*/ */
boolean applySlice = numDocsInRAM != 0; boolean applySlice = numDocsInRAM != 0;
long seqNo;
if (delTerm != null) { if (delTerm != null) {
deleteQueue.add(delTerm, deleteSlice); seqNo = deleteQueue.add(delTerm, deleteSlice);
assert deleteSlice.isTailItem(delTerm) : "expected the delete term as the tail item"; assert deleteSlice.isTailItem(delTerm) : "expected the delete term as the tail item";
} else { } else {
applySlice &= deleteQueue.updateSlice(deleteSlice); seqNo = deleteQueue.updateSlice(deleteSlice);
if (seqNo < 0) {
seqNo = -seqNo;
} else {
applySlice = false;
}
} }
if (applySlice) { if (applySlice) {
@ -331,6 +349,8 @@ class DocumentsWriterPerThread {
deleteSlice.reset(); deleteSlice.reset();
} }
++numDocsInRAM; ++numDocsInRAM;
return seqNo;
} }
// Buffer a specific docID for deletion. Currently only // Buffer a specific docID for deletion. Currently only

View File

@ -226,22 +226,8 @@ final class DocumentsWriterPerThreadPool {
return threadStates.get(ord); return threadStates.get(ord);
} }
// TODO: merge this with getActiveThreadStateCount: they are the same!
synchronized int getMaxThreadStates() { synchronized int getMaxThreadStates() {
return threadStates.size(); return threadStates.size();
} }
/**
* Returns the ThreadState with the minimum estimated number of threads
* waiting to acquire its lock or <code>null</code> if no {@link ThreadState}
* is yet visible to the calling thread.
*/
ThreadState minContendedThreadState() {
ThreadState minThreadState = null;
for (ThreadState state : threadStates) {
if (minThreadState == null || state.getQueueLength() < minThreadState.getQueueLength()) {
minThreadState = state;
}
}
return minThreadState;
}
} }

View File

@ -95,6 +95,14 @@ import org.apache.lucene.util.Version;
and then adds the entire document). When finished adding, deleting and then adds the entire document). When finished adding, deleting
and updating documents, {@link #close() close} should be called.</p> and updating documents, {@link #close() close} should be called.</p>
<a name="sequence_numbers"></a>
<p>Each method that changes the index returns a {@code long} sequence number, which
expresses the effective order in which each change was applied.
{@link #commit} also returns a sequence number, describing which
changes are in the commit point and which are not. Sequence numbers
are transient (not saved into the index in any way) and only valid
within a single {@code IndexWriter} instance.</p>
<a name="flush"></a> <a name="flush"></a>
<p>These changes are buffered in memory and periodically <p>These changes are buffered in memory and periodically
flushed to the {@link Directory} (during the above method flushed to the {@link Directory} (during the above method
@ -266,6 +274,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
private List<SegmentCommitInfo> rollbackSegments; // list of segmentInfo we will fallback to if the commit fails private List<SegmentCommitInfo> rollbackSegments; // list of segmentInfo we will fallback to if the commit fails
volatile SegmentInfos pendingCommit; // set when a commit is pending (after prepareCommit() & before commit()) volatile SegmentInfos pendingCommit; // set when a commit is pending (after prepareCommit() & before commit())
volatile long pendingSeqNo;
volatile long pendingCommitChangeCount; volatile long pendingCommitChangeCount;
private Collection<String> filesToCommit; private Collection<String> filesToCommit;
@ -425,7 +434,14 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
boolean success = false; boolean success = false;
synchronized (fullFlushLock) { synchronized (fullFlushLock) {
try { try {
anyChanges = docWriter.flushAllThreads(); // TODO: should we somehow make this available in the returned NRT reader?
long seqNo = docWriter.flushAllThreads();
if (seqNo < 0) {
anyChanges = true;
seqNo = -seqNo;
} else {
anyChanges = false;
}
if (!anyChanges) { if (!anyChanges) {
// prevent double increment since docWriter#doFlush increments the flushcount // prevent double increment since docWriter#doFlush increments the flushcount
// if we flushed anything. // if we flushed anything.
@ -1280,11 +1296,14 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
* replaced with the Unicode replacement character * replaced with the Unicode replacement character
* U+FFFD.</p> * U+FFFD.</p>
* *
* @return The <a href="#sequence_number">sequence number</a>
* for this operation
*
* @throws CorruptIndexException if the index is corrupt * @throws CorruptIndexException if the index is corrupt
* @throws IOException if there is a low-level IO error * @throws IOException if there is a low-level IO error
*/ */
public void addDocument(Iterable<? extends IndexableField> doc) throws IOException { public long addDocument(Iterable<? extends IndexableField> doc) throws IOException {
updateDocument(null, doc); return updateDocument(null, doc);
} }
/** /**
@ -1319,13 +1338,16 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
* and will likely break them up. Use such tools at your * and will likely break them up. Use such tools at your
* own risk! * own risk!
* *
* @return The <a href="#sequence_number">sequence number</a>
* for this operation
*
* @throws CorruptIndexException if the index is corrupt * @throws CorruptIndexException if the index is corrupt
* @throws IOException if there is a low-level IO error * @throws IOException if there is a low-level IO error
* *
* @lucene.experimental * @lucene.experimental
*/ */
public void addDocuments(Iterable<? extends Iterable<? extends IndexableField>> docs) throws IOException { public long addDocuments(Iterable<? extends Iterable<? extends IndexableField>> docs) throws IOException {
updateDocuments(null, docs); return updateDocuments(null, docs);
} }
/** /**
@ -1336,20 +1358,26 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
* *
* See {@link #addDocuments(Iterable)}. * See {@link #addDocuments(Iterable)}.
* *
* @return The <a href="#sequence_number">sequence number</a>
* for this operation
*
* @throws CorruptIndexException if the index is corrupt * @throws CorruptIndexException if the index is corrupt
* @throws IOException if there is a low-level IO error * @throws IOException if there is a low-level IO error
* *
* @lucene.experimental * @lucene.experimental
*/ */
public void updateDocuments(Term delTerm, Iterable<? extends Iterable<? extends IndexableField>> docs) throws IOException { public long updateDocuments(Term delTerm, Iterable<? extends Iterable<? extends IndexableField>> docs) throws IOException {
ensureOpen(); ensureOpen();
try { try {
boolean success = false; boolean success = false;
try { try {
if (docWriter.updateDocuments(docs, analyzer, delTerm)) { long seqNo = docWriter.updateDocuments(docs, analyzer, delTerm);
if (seqNo < 0) {
seqNo = -seqNo;
processEvents(true, false); processEvents(true, false);
} }
success = true; success = true;
return seqNo;
} finally { } finally {
if (!success) { if (!success) {
if (infoStream.isEnabled("IW")) { if (infoStream.isEnabled("IW")) {
@ -1359,6 +1387,9 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
} }
} catch (AbortingException | VirtualMachineError tragedy) { } catch (AbortingException | VirtualMachineError tragedy) {
tragicEvent(tragedy, "updateDocuments"); tragicEvent(tragedy, "updateDocuments");
// dead code but javac disagrees
return -1;
} }
} }
@ -1367,15 +1398,15 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
* DirectoryReader#open(IndexWriter)}). If the * DirectoryReader#open(IndexWriter)}). If the
* provided reader is an NRT reader obtained from this * provided reader is an NRT reader obtained from this
* writer, and its segment has not been merged away, then * writer, and its segment has not been merged away, then
* the delete succeeds and this method returns true; else, it * the delete succeeds and this method returns a valid (&gt; 0) sequence
* returns false the caller must then separately delete by * number; else, it returns -1 and the caller must then
* Term or Query. * separately delete by Term or Query.
* *
* <b>NOTE</b>: this method can only delete documents * <b>NOTE</b>: this method can only delete documents
* visible to the currently open NRT reader. If you need * visible to the currently open NRT reader. If you need
* to delete documents indexed after opening the NRT * to delete documents indexed after opening the NRT
* reader you must use {@link #deleteDocuments(Term...)}). */ * reader you must use {@link #deleteDocuments(Term...)}). */
public synchronized boolean tryDeleteDocument(IndexReader readerIn, int docID) throws IOException { public synchronized long tryDeleteDocument(IndexReader readerIn, int docID) throws IOException {
final LeafReader reader; final LeafReader reader;
if (readerIn instanceof LeafReader) { if (readerIn instanceof LeafReader) {
@ -1426,7 +1457,8 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
changed(); changed();
} }
//System.out.println(" yes " + info.info.name + " " + docID); //System.out.println(" yes " + info.info.name + " " + docID);
return true;
return docWriter.deleteQueue.getNextSequenceNumber();
} }
} else { } else {
//System.out.println(" no rld " + info.info.name + " " + docID); //System.out.println(" no rld " + info.info.name + " " + docID);
@ -1434,7 +1466,8 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
} else { } else {
//System.out.println(" no seg " + info.info.name + " " + docID); //System.out.println(" no seg " + info.info.name + " " + docID);
} }
return false;
return -1;
} }
/** /**
@ -1442,19 +1475,28 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
* terms. All given deletes are applied and flushed atomically * terms. All given deletes are applied and flushed atomically
* at the same time. * at the same time.
* *
* @return The <a href="#sequence_number">sequence number</a>
* for this operation
*
* @param terms array of terms to identify the documents * @param terms array of terms to identify the documents
* to be deleted * to be deleted
* @throws CorruptIndexException if the index is corrupt * @throws CorruptIndexException if the index is corrupt
* @throws IOException if there is a low-level IO error * @throws IOException if there is a low-level IO error
*/ */
public void deleteDocuments(Term... terms) throws IOException { public long deleteDocuments(Term... terms) throws IOException {
ensureOpen(); ensureOpen();
try { try {
if (docWriter.deleteTerms(terms)) { long seqNo = docWriter.deleteTerms(terms);
if (seqNo < 0) {
seqNo = -seqNo;
processEvents(true, false); processEvents(true, false);
} }
return seqNo;
} catch (VirtualMachineError tragedy) { } catch (VirtualMachineError tragedy) {
tragicEvent(tragedy, "deleteDocuments(Term..)"); tragicEvent(tragedy, "deleteDocuments(Term..)");
// dead code but javac disagrees:
return -1;
} }
} }
@ -1462,28 +1504,37 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
* Deletes the document(s) matching any of the provided queries. * Deletes the document(s) matching any of the provided queries.
* All given deletes are applied and flushed atomically at the same time. * All given deletes are applied and flushed atomically at the same time.
* *
* @return The <a href="#sequence_number">sequence number</a>
* for this operation
*
* @param queries array of queries to identify the documents * @param queries array of queries to identify the documents
* to be deleted * to be deleted
* @throws CorruptIndexException if the index is corrupt * @throws CorruptIndexException if the index is corrupt
* @throws IOException if there is a low-level IO error * @throws IOException if there is a low-level IO error
*/ */
public void deleteDocuments(Query... queries) throws IOException { public long deleteDocuments(Query... queries) throws IOException {
ensureOpen(); ensureOpen();
// LUCENE-6379: Specialize MatchAllDocsQuery // LUCENE-6379: Specialize MatchAllDocsQuery
for(Query query : queries) { for(Query query : queries) {
if (query.getClass() == MatchAllDocsQuery.class) { if (query.getClass() == MatchAllDocsQuery.class) {
deleteAll(); return deleteAll();
return;
} }
} }
try { try {
if (docWriter.deleteQueries(queries)) { long seqNo = docWriter.deleteQueries(queries);
if (seqNo < 0) {
seqNo = -seqNo;
processEvents(true, false); processEvents(true, false);
} }
return seqNo;
} catch (VirtualMachineError tragedy) { } catch (VirtualMachineError tragedy) {
tragicEvent(tragedy, "deleteDocuments(Query..)"); tragicEvent(tragedy, "deleteDocuments(Query..)");
// dead code but javac disagrees:
return -1;
} }
} }
@ -1494,21 +1545,27 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
* by a reader on the same index (flush may happen only after * by a reader on the same index (flush may happen only after
* the add). * the add).
* *
* @return The <a href="#sequence_number">sequence number</a>
* for this operation
*
* @param term the term to identify the document(s) to be * @param term the term to identify the document(s) to be
* deleted * deleted
* @param doc the document to be added * @param doc the document to be added
* @throws CorruptIndexException if the index is corrupt * @throws CorruptIndexException if the index is corrupt
* @throws IOException if there is a low-level IO error * @throws IOException if there is a low-level IO error
*/ */
public void updateDocument(Term term, Iterable<? extends IndexableField> doc) throws IOException { public long updateDocument(Term term, Iterable<? extends IndexableField> doc) throws IOException {
ensureOpen(); ensureOpen();
try { try {
boolean success = false; boolean success = false;
try { try {
if (docWriter.updateDocument(doc, analyzer, term)) { long seqNo = docWriter.updateDocument(doc, analyzer, term);
if (seqNo < 0) {
seqNo = - seqNo;
processEvents(true, false); processEvents(true, false);
} }
success = true; success = true;
return seqNo;
} finally { } finally {
if (!success) { if (!success) {
if (infoStream.isEnabled("IW")) { if (infoStream.isEnabled("IW")) {
@ -1518,6 +1575,9 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
} }
} catch (AbortingException | VirtualMachineError tragedy) { } catch (AbortingException | VirtualMachineError tragedy) {
tragicEvent(tragedy, "updateDocument"); tragicEvent(tragedy, "updateDocument");
// dead code but javac disagrees:
return -1;
} }
} }
@ -1532,22 +1592,32 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
* field name of the {@link NumericDocValues} field * field name of the {@link NumericDocValues} field
* @param value * @param value
* new value for the field * new value for the field
*
* @return The <a href="#sequence_number">sequence number</a>
* for this operation
*
* @throws CorruptIndexException * @throws CorruptIndexException
* if the index is corrupt * if the index is corrupt
* @throws IOException * @throws IOException
* if there is a low-level IO error * if there is a low-level IO error
*/ */
public void updateNumericDocValue(Term term, String field, long value) throws IOException { public long updateNumericDocValue(Term term, String field, long value) throws IOException {
ensureOpen(); ensureOpen();
if (!globalFieldNumberMap.contains(field, DocValuesType.NUMERIC)) { if (!globalFieldNumberMap.contains(field, DocValuesType.NUMERIC)) {
throw new IllegalArgumentException("can only update existing numeric-docvalues fields!"); throw new IllegalArgumentException("can only update existing numeric-docvalues fields!");
} }
try { try {
if (docWriter.updateDocValues(new NumericDocValuesUpdate(term, field, value))) { long seqNo = docWriter.updateDocValues(new NumericDocValuesUpdate(term, field, value));
if (seqNo < 0) {
seqNo = -seqNo;
processEvents(true, false); processEvents(true, false);
} }
return seqNo;
} catch (VirtualMachineError tragedy) { } catch (VirtualMachineError tragedy) {
tragicEvent(tragedy, "updateNumericDocValue"); tragicEvent(tragedy, "updateNumericDocValue");
// dead code but javac disagrees:
return -1;
} }
} }
@ -1566,12 +1636,16 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
* field name of the {@link BinaryDocValues} field * field name of the {@link BinaryDocValues} field
* @param value * @param value
* new value for the field * new value for the field
*
* @return The <a href="#sequence_number">sequence number</a>
* for this operation
*
* @throws CorruptIndexException * @throws CorruptIndexException
* if the index is corrupt * if the index is corrupt
* @throws IOException * @throws IOException
* if there is a low-level IO error * if there is a low-level IO error
*/ */
public void updateBinaryDocValue(Term term, String field, BytesRef value) throws IOException { public long updateBinaryDocValue(Term term, String field, BytesRef value) throws IOException {
ensureOpen(); ensureOpen();
if (value == null) { if (value == null) {
throw new IllegalArgumentException("cannot update a field to a null value: " + field); throw new IllegalArgumentException("cannot update a field to a null value: " + field);
@ -1580,11 +1654,17 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
throw new IllegalArgumentException("can only update existing binary-docvalues fields!"); throw new IllegalArgumentException("can only update existing binary-docvalues fields!");
} }
try { try {
if (docWriter.updateDocValues(new BinaryDocValuesUpdate(term, field, value))) { long seqNo = docWriter.updateDocValues(new BinaryDocValuesUpdate(term, field, value));
if (seqNo < 0) {
seqNo = -seqNo;
processEvents(true, false); processEvents(true, false);
} }
return seqNo;
} catch (VirtualMachineError tragedy) { } catch (VirtualMachineError tragedy) {
tragicEvent(tragedy, "updateBinaryDocValue"); tragicEvent(tragedy, "updateBinaryDocValue");
// dead code but javac disagrees:
return -1;
} }
} }
@ -1596,12 +1676,16 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
* *
* @param updates * @param updates
* the updates to apply * the updates to apply
*
* @return The <a href="#sequence_number">sequence number</a>
* for this operation
*
* @throws CorruptIndexException * @throws CorruptIndexException
* if the index is corrupt * if the index is corrupt
* @throws IOException * @throws IOException
* if there is a low-level IO error * if there is a low-level IO error
*/ */
public void updateDocValues(Term term, Field... updates) throws IOException { public long updateDocValues(Term term, Field... updates) throws IOException {
ensureOpen(); ensureOpen();
DocValuesUpdate[] dvUpdates = new DocValuesUpdate[updates.length]; DocValuesUpdate[] dvUpdates = new DocValuesUpdate[updates.length];
for (int i = 0; i < updates.length; i++) { for (int i = 0; i < updates.length; i++) {
@ -1628,11 +1712,17 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
} }
} }
try { try {
if (docWriter.updateDocValues(dvUpdates)) { long seqNo = docWriter.updateDocValues(dvUpdates);
if (seqNo < 0) {
seqNo = -seqNo;
processEvents(true, false); processEvents(true, false);
} }
return seqNo;
} catch (VirtualMachineError tragedy) { } catch (VirtualMachineError tragedy) {
tragicEvent(tragedy, "updateDocValues"); tragicEvent(tragedy, "updateDocValues");
// dead code but javac disagrees:
return -1;
} }
} }
@ -2204,8 +2294,11 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
* threads are running {@link #forceMerge}, {@link #addIndexes(CodecReader[])} * threads are running {@link #forceMerge}, {@link #addIndexes(CodecReader[])}
* or {@link #forceMergeDeletes} methods, they may receive * or {@link #forceMergeDeletes} methods, they may receive
* {@link MergePolicy.MergeAbortedException}s. * {@link MergePolicy.MergeAbortedException}s.
*
* @return The <a href="#sequence_number">sequence number</a>
* for this operation
*/ */
public void deleteAll() throws IOException { public long deleteAll() throws IOException {
ensureOpen(); ensureOpen();
// Remove any buffered docs // Remove any buffered docs
boolean success = false; boolean success = false;
@ -2252,6 +2345,8 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
globalFieldNumberMap.clear(); globalFieldNumberMap.clear();
success = true; success = true;
return docWriter.deleteQueue.getNextSequenceNumber();
} finally { } finally {
docWriter.unlockAllAfterAbortAll(this); docWriter.unlockAllAfterAbortAll(this);
if (!success) { if (!success) {
@ -2264,6 +2359,9 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
} }
} catch (VirtualMachineError tragedy) { } catch (VirtualMachineError tragedy) {
tragicEvent(tragedy, "deleteAll"); tragicEvent(tragedy, "deleteAll");
// dead code but javac disagrees
return -1;
} }
} }
@ -2485,13 +2583,16 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
* *
* <p>This requires this index not be among those to be added. * <p>This requires this index not be among those to be added.
* *
* @return The <a href="#sequence_number">sequence number</a>
* for this operation
*
* @throws CorruptIndexException if the index is corrupt * @throws CorruptIndexException if the index is corrupt
* @throws IOException if there is a low-level IO error * @throws IOException if there is a low-level IO error
* @throws IllegalArgumentException if addIndexes would cause * @throws IllegalArgumentException if addIndexes would cause
* the index to exceed {@link #MAX_DOCS}, or if the indoming * the index to exceed {@link #MAX_DOCS}, or if the indoming
* index sort does not match this index's index sort * index sort does not match this index's index sort
*/ */
public void addIndexes(Directory... dirs) throws IOException { public long addIndexes(Directory... dirs) throws IOException {
ensureOpen(); ensureOpen();
noDupDirs(dirs); noDupDirs(dirs);
@ -2502,6 +2603,8 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
boolean successTop = false; boolean successTop = false;
long seqNo;
try { try {
if (infoStream.isEnabled("IW")) { if (infoStream.isEnabled("IW")) {
infoStream.message("IW", "flush at addIndexes(Directory...)"); infoStream.message("IW", "flush at addIndexes(Directory...)");
@ -2573,6 +2676,8 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
// Now reserve the docs, just before we update SIS: // Now reserve the docs, just before we update SIS:
reserveDocs(totalMaxDoc); reserveDocs(totalMaxDoc);
seqNo = docWriter.deleteQueue.getNextSequenceNumber();
success = true; success = true;
} finally { } finally {
if (!success) { if (!success) {
@ -2590,6 +2695,8 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
} catch (VirtualMachineError tragedy) { } catch (VirtualMachineError tragedy) {
tragicEvent(tragedy, "addIndexes(Directory...)"); tragicEvent(tragedy, "addIndexes(Directory...)");
// dead code but javac disagrees:
seqNo = -1;
} finally { } finally {
if (successTop) { if (successTop) {
IOUtils.close(locks); IOUtils.close(locks);
@ -2598,6 +2705,8 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
} }
} }
maybeMerge(); maybeMerge();
return seqNo;
} }
/** /**
@ -2622,6 +2731,9 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
* {@code maxMergeAtOnce} parameter, you should pass that many readers in one * {@code maxMergeAtOnce} parameter, you should pass that many readers in one
* call. * call.
* *
* @return The <a href="#sequence_number">sequence number</a>
* for this operation
*
* @throws CorruptIndexException * @throws CorruptIndexException
* if the index is corrupt * if the index is corrupt
* @throws IOException * @throws IOException
@ -2629,7 +2741,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
* @throws IllegalArgumentException * @throws IllegalArgumentException
* if addIndexes would cause the index to exceed {@link #MAX_DOCS} * if addIndexes would cause the index to exceed {@link #MAX_DOCS}
*/ */
public void addIndexes(CodecReader... readers) throws IOException { public long addIndexes(CodecReader... readers) throws IOException {
ensureOpen(); ensureOpen();
// long so we can detect int overflow: // long so we can detect int overflow:
@ -2637,6 +2749,8 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
Sort indexSort = config.getIndexSort(); Sort indexSort = config.getIndexSort();
long seqNo;
try { try {
if (infoStream.isEnabled("IW")) { if (infoStream.isEnabled("IW")) {
infoStream.message("IW", "flush at addIndexes(CodecReader...)"); infoStream.message("IW", "flush at addIndexes(CodecReader...)");
@ -2671,7 +2785,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
rateLimiters.set(new MergeRateLimiter(null)); rateLimiters.set(new MergeRateLimiter(null));
if (!merger.shouldMerge()) { if (!merger.shouldMerge()) {
return; return docWriter.deleteQueue.getNextSequenceNumber();
} }
merger.merge(); // merge 'em merger.merge(); // merge 'em
@ -2689,7 +2803,8 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
if (stopMerges) { if (stopMerges) {
// Safe: these files must exist // Safe: these files must exist
deleteNewFiles(infoPerCommit.files()); deleteNewFiles(infoPerCommit.files());
return;
return docWriter.deleteQueue.getNextSequenceNumber();
} }
ensureOpen(); ensureOpen();
useCompoundFile = mergePolicy.useCompoundFile(segmentInfos, infoPerCommit, this); useCompoundFile = mergePolicy.useCompoundFile(segmentInfos, infoPerCommit, this);
@ -2724,7 +2839,8 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
if (stopMerges) { if (stopMerges) {
// Safe: these files must exist // Safe: these files must exist
deleteNewFiles(infoPerCommit.files()); deleteNewFiles(infoPerCommit.files());
return;
return docWriter.deleteQueue.getNextSequenceNumber();
} }
ensureOpen(); ensureOpen();
@ -2732,12 +2848,17 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
reserveDocs(numDocs); reserveDocs(numDocs);
segmentInfos.add(infoPerCommit); segmentInfos.add(infoPerCommit);
seqNo = docWriter.deleteQueue.getNextSequenceNumber();
checkpoint(); checkpoint();
} }
} catch (VirtualMachineError tragedy) { } catch (VirtualMachineError tragedy) {
tragicEvent(tragedy, "addIndexes(CodecReader...)"); tragicEvent(tragedy, "addIndexes(CodecReader...)");
// dead code but javac disagrees:
seqNo = -1;
} }
maybeMerge(); maybeMerge();
return seqNo;
} }
/** Copies the segment files as-is into the IndexWriter's directory. */ /** Copies the segment files as-is into the IndexWriter's directory. */
@ -2805,14 +2926,19 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
* <p>You can also just call {@link #commit()} directly * <p>You can also just call {@link #commit()} directly
* without prepareCommit first in which case that method * without prepareCommit first in which case that method
* will internally call prepareCommit. * will internally call prepareCommit.
*
* @return The <a href="#sequence_number">sequence number</a>
* of the last operation in the commit. All sequence numbers &lt;= this value
* will be reflected in the commit, and all others will not.
*/ */
@Override @Override
public final void prepareCommit() throws IOException { public final long prepareCommit() throws IOException {
ensureOpen(); ensureOpen();
prepareCommitInternal(config.getMergePolicy()); pendingSeqNo = prepareCommitInternal(config.getMergePolicy());
return pendingSeqNo;
} }
private void prepareCommitInternal(MergePolicy mergePolicy) throws IOException { private long prepareCommitInternal(MergePolicy mergePolicy) throws IOException {
startCommitTime = System.nanoTime(); startCommitTime = System.nanoTime();
synchronized(commitLock) { synchronized(commitLock) {
ensureOpen(false); ensureOpen(false);
@ -2833,6 +2959,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
testPoint("startDoFlush"); testPoint("startDoFlush");
SegmentInfos toCommit = null; SegmentInfos toCommit = null;
boolean anySegmentsFlushed = false; boolean anySegmentsFlushed = false;
long seqNo;
// This is copied from doFlush, except it's modified to // This is copied from doFlush, except it's modified to
// clone & incRef the flushed SegmentInfos inside the // clone & incRef the flushed SegmentInfos inside the
@ -2844,7 +2971,11 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
boolean flushSuccess = false; boolean flushSuccess = false;
boolean success = false; boolean success = false;
try { try {
anySegmentsFlushed = docWriter.flushAllThreads(); seqNo = docWriter.flushAllThreads();
if (seqNo < 0) {
anySegmentsFlushed = true;
seqNo = -seqNo;
}
if (!anySegmentsFlushed) { if (!anySegmentsFlushed) {
// prevent double increment since docWriter#doFlush increments the flushcount // prevent double increment since docWriter#doFlush increments the flushcount
// if we flushed anything. // if we flushed anything.
@ -2898,6 +3029,9 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
} }
} catch (AbortingException | VirtualMachineError tragedy) { } catch (AbortingException | VirtualMachineError tragedy) {
tragicEvent(tragedy, "prepareCommit"); tragicEvent(tragedy, "prepareCommit");
// dead code but javac disagrees:
seqNo = -1;
} }
boolean success = false; boolean success = false;
@ -2907,6 +3041,11 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
} }
startCommit(toCommit); startCommit(toCommit);
success = true; success = true;
if (pendingCommit == null) {
return -1;
} else {
return seqNo;
}
} finally { } finally {
if (!success) { if (!success) {
synchronized (this) { synchronized (this) {
@ -2980,12 +3119,22 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
* loss it may still lose data. Lucene cannot guarantee * loss it may still lose data. Lucene cannot guarantee
* consistency on such devices. </p> * consistency on such devices. </p>
* *
* <p> If nothing was committed, because there were no
* pending changes, this returns -1. Otherwise, it returns
* the sequence number such that all indexing operations
* prior to this sequence will be included in the commit
* point, and all other operations will not. </p>
*
* @see #prepareCommit * @see #prepareCommit
*
* @return The <a href="#sequence_number">sequence number</a>
* of the last operation in the commit. All sequence numbers &lt;= this value
* will be reflected in the commit, and all others will not.
*/ */
@Override @Override
public final void commit() throws IOException { public final long commit() throws IOException {
ensureOpen(); ensureOpen();
commitInternal(config.getMergePolicy()); return commitInternal(config.getMergePolicy());
} }
/** Returns true if there may be changes that have not been /** Returns true if there may be changes that have not been
@ -3001,7 +3150,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
return changeCount.get() != lastCommitChangeCount || docWriter.anyChanges() || bufferedUpdatesStream.any(); return changeCount.get() != lastCommitChangeCount || docWriter.anyChanges() || bufferedUpdatesStream.any();
} }
private final void commitInternal(MergePolicy mergePolicy) throws IOException { private final long commitInternal(MergePolicy mergePolicy) throws IOException {
if (infoStream.isEnabled("IW")) { if (infoStream.isEnabled("IW")) {
infoStream.message("IW", "commit: start"); infoStream.message("IW", "commit: start");
@ -3014,18 +3163,23 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
infoStream.message("IW", "commit: enter lock"); infoStream.message("IW", "commit: enter lock");
} }
long seqNo;
if (pendingCommit == null) { if (pendingCommit == null) {
if (infoStream.isEnabled("IW")) { if (infoStream.isEnabled("IW")) {
infoStream.message("IW", "commit: now prepare"); infoStream.message("IW", "commit: now prepare");
} }
prepareCommitInternal(mergePolicy); seqNo = prepareCommitInternal(mergePolicy);
} else { } else {
if (infoStream.isEnabled("IW")) { if (infoStream.isEnabled("IW")) {
infoStream.message("IW", "commit: already prepared"); infoStream.message("IW", "commit: already prepared");
} }
seqNo = pendingSeqNo;
} }
finishCommit(); finishCommit();
return seqNo;
} }
} }
@ -3167,7 +3321,13 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
synchronized (fullFlushLock) { synchronized (fullFlushLock) {
boolean flushSuccess = false; boolean flushSuccess = false;
try { try {
anyChanges = docWriter.flushAllThreads(); long seqNo = docWriter.flushAllThreads();
if (seqNo < 0) {
seqNo = -seqNo;
anyChanges = true;
} else {
anyChanges = false;
}
if (!anyChanges) { if (!anyChanges) {
// flushCount is incremented in flushAllThreads // flushCount is incremented in flushAllThreads
flushCount.incrementAndGet(); flushCount.incrementAndGet();
@ -4888,4 +5048,13 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
} }
}; };
} }
/** Returns the last <a href="#sequence_number">sequence number</a>, or 0
* if no index-changing operations have completed yet.
*
* @lucene.experimental */
public long getLastSequenceNumber() {
ensureOpen();
return docWriter.deleteQueue.getLastSequenceNumber();
}
} }

View File

@ -421,7 +421,7 @@ public final class StandardDirectoryReader extends DirectoryReader {
@Override @Override
public String toString() { public String toString() {
return "DirectoryReader.ReaderCommit(" + segmentsFileName + ")"; return "StandardDirectoryReader.ReaderCommit(" + segmentsFileName + " files=" + files + ")";
} }
@Override @Override

View File

@ -1,168 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.lucene.index;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.search.ControlledRealTimeReopenThread; // javadocs
import org.apache.lucene.search.Query;
import org.apache.lucene.store.Directory;
/** Class that tracks changes to a delegated
* IndexWriter, used by {@link
* ControlledRealTimeReopenThread} to ensure specific
* changes are visible. Create this class (passing your
* IndexWriter), and then pass this class to {@link
* ControlledRealTimeReopenThread}.
* Be sure to make all changes via the
* TrackingIndexWriter, otherwise {@link
* ControlledRealTimeReopenThread} won't know about the changes.
*
* @lucene.experimental */
public class TrackingIndexWriter {
private final IndexWriter writer;
private final AtomicLong indexingGen = new AtomicLong(1);
/** Create a {@code TrackingIndexWriter} wrapping the
* provided {@link IndexWriter}. */
public TrackingIndexWriter(IndexWriter writer) {
this.writer = writer;
}
/** Calls {@link
* IndexWriter#updateDocument(Term,Iterable)} and
* returns the generation that reflects this change. */
public long updateDocument(Term t, Iterable<? extends IndexableField> d) throws IOException {
writer.updateDocument(t, d);
// Return gen as of when indexing finished:
return indexingGen.get();
}
/** Calls {@link
* IndexWriter#updateDocuments(Term,Iterable)} and returns
* the generation that reflects this change. */
public long updateDocuments(Term t, Iterable<? extends Iterable<? extends IndexableField>> docs) throws IOException {
writer.updateDocuments(t, docs);
// Return gen as of when indexing finished:
return indexingGen.get();
}
/** Calls {@link IndexWriter#deleteDocuments(Term...)} and
* returns the generation that reflects this change. */
public long deleteDocuments(Term t) throws IOException {
writer.deleteDocuments(t);
// Return gen as of when indexing finished:
return indexingGen.get();
}
/** Calls {@link IndexWriter#deleteDocuments(Term...)} and
* returns the generation that reflects this change. */
public long deleteDocuments(Term... terms) throws IOException {
writer.deleteDocuments(terms);
// Return gen as of when indexing finished:
return indexingGen.get();
}
/** Calls {@link IndexWriter#deleteDocuments(Query...)} and
* returns the generation that reflects this change. */
public long deleteDocuments(Query q) throws IOException {
writer.deleteDocuments(q);
// Return gen as of when indexing finished:
return indexingGen.get();
}
/** Calls {@link IndexWriter#deleteDocuments(Query...)}
* and returns the generation that reflects this change. */
public long deleteDocuments(Query... queries) throws IOException {
writer.deleteDocuments(queries);
// Return gen as of when indexing finished:
return indexingGen.get();
}
/** Calls {@link IndexWriter#deleteAll} and returns the
* generation that reflects this change. */
public long deleteAll() throws IOException {
writer.deleteAll();
// Return gen as of when indexing finished:
return indexingGen.get();
}
/** Calls {@link IndexWriter#addDocument(Iterable)}
* and returns the generation that reflects this change. */
public long addDocument(Iterable<? extends IndexableField> d) throws IOException {
writer.addDocument(d);
// Return gen as of when indexing finished:
return indexingGen.get();
}
/** Calls {@link IndexWriter#addDocuments(Iterable)} and
* returns the generation that reflects this change. */
public long addDocuments(Iterable<? extends Iterable<? extends IndexableField>> docs) throws IOException {
writer.addDocuments(docs);
// Return gen as of when indexing finished:
return indexingGen.get();
}
/** Calls {@link IndexWriter#addIndexes(Directory...)} and
* returns the generation that reflects this change. */
public long addIndexes(Directory... dirs) throws IOException {
writer.addIndexes(dirs);
// Return gen as of when indexing finished:
return indexingGen.get();
}
/** Calls {@link IndexWriter#addIndexes(CodecReader...)}
* and returns the generation that reflects this change. */
public long addIndexes(CodecReader... readers) throws IOException {
writer.addIndexes(readers);
// Return gen as of when indexing finished:
return indexingGen.get();
}
/** Return the current generation being indexed. */
public long getGeneration() {
return indexingGen.get();
}
/** Return the wrapped {@link IndexWriter}. */
public IndexWriter getIndexWriter() {
return writer;
}
/** Return and increment current gen.
*
* @lucene.internal */
public long getAndIncrementGeneration() {
return indexingGen.getAndIncrement();
}
/** Cals {@link
* IndexWriter#tryDeleteDocument(IndexReader,int)} and
* returns the generation that reflects this change. */
public long tryDeleteDocument(IndexReader reader, int docID) throws IOException {
if (writer.tryDeleteDocument(reader, docID)) {
return indexingGen.get();
} else {
return -1;
}
}
}

View File

@ -34,7 +34,7 @@ public interface TwoPhaseCommit {
* 2-phase commit fails, {@link #rollback()} is called to discard all changes * 2-phase commit fails, {@link #rollback()} is called to discard all changes
* since last successful commit. * since last successful commit.
*/ */
public void prepareCommit() throws IOException; public long prepareCommit() throws IOException;
/** /**
* The second phase of a 2-phase commit. Implementations should ideally do * The second phase of a 2-phase commit. Implementations should ideally do
@ -42,7 +42,7 @@ public interface TwoPhaseCommit {
* after it returns, the caller can assume that the changes were successfully * after it returns, the caller can assume that the changes were successfully
* committed to the underlying storage. * committed to the underlying storage.
*/ */
public void commit() throws IOException; public long commit() throws IOException;
/** /**
* Discards any changes that have occurred since the last commit. In a 2-phase * Discards any changes that have occurred since the last commit. In a 2-phase
@ -51,5 +51,4 @@ public interface TwoPhaseCommit {
* back to their previous state. * back to their previous state.
*/ */
public void rollback() throws IOException; public void rollback() throws IOException;
} }

View File

@ -23,16 +23,11 @@ import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
import org.apache.lucene.index.IndexWriter; import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.TrackingIndexWriter;
import org.apache.lucene.util.ThreadInterruptedException; import org.apache.lucene.util.ThreadInterruptedException;
/** Utility class that runs a thread to manage periodicc /** Utility class that runs a thread to manage periodicc
* reopens of a {@link ReferenceManager}, with methods to wait for a specific * reopens of a {@link ReferenceManager}, with methods to wait for a specific
* index changes to become visible. To use this class you * index changes to become visible. When a given search request needs to see a specific
* must first wrap your {@link IndexWriter} with a {@link
* TrackingIndexWriter} and always use it to make changes
* to the index, saving the returned generation. Then,
* when a given search request needs to see a specific
* index change, call the {#waitForGeneration} to wait for * index change, call the {#waitForGeneration} to wait for
* that change to be visible. Note that this will only * that change to be visible. Note that this will only
* scale well if most searches do not need to wait for a * scale well if most searches do not need to wait for a
@ -44,7 +39,7 @@ public class ControlledRealTimeReopenThread<T> extends Thread implements Closeab
private final ReferenceManager<T> manager; private final ReferenceManager<T> manager;
private final long targetMaxStaleNS; private final long targetMaxStaleNS;
private final long targetMinStaleNS; private final long targetMinStaleNS;
private final TrackingIndexWriter writer; private final IndexWriter writer;
private volatile boolean finish; private volatile boolean finish;
private volatile long waitingGen; private volatile long waitingGen;
private volatile long searchingGen; private volatile long searchingGen;
@ -69,7 +64,7 @@ public class ControlledRealTimeReopenThread<T> extends Thread implements Closeab
* is waiting for a specific generation to * is waiting for a specific generation to
* become visible. * become visible.
*/ */
public ControlledRealTimeReopenThread(TrackingIndexWriter writer, ReferenceManager<T> manager, double targetMaxStaleSec, double targetMinStaleSec) { public ControlledRealTimeReopenThread(IndexWriter writer, ReferenceManager<T> manager, double targetMaxStaleSec, double targetMinStaleSec) {
if (targetMaxStaleSec < targetMinStaleSec) { if (targetMaxStaleSec < targetMinStaleSec) {
throw new IllegalArgumentException("targetMaxScaleSec (= " + targetMaxStaleSec + ") < targetMinStaleSec (=" + targetMinStaleSec + ")"); throw new IllegalArgumentException("targetMaxScaleSec (= " + targetMaxStaleSec + ") < targetMinStaleSec (=" + targetMinStaleSec + ")");
} }
@ -155,10 +150,7 @@ public class ControlledRealTimeReopenThread<T> extends Thread implements Closeab
* or false if maxMS wait time was exceeded * or false if maxMS wait time was exceeded
*/ */
public synchronized boolean waitForGeneration(long targetGen, int maxMS) throws InterruptedException { public synchronized boolean waitForGeneration(long targetGen, int maxMS) throws InterruptedException {
final long curGen = writer.getGeneration(); final long curGen = writer.getLastSequenceNumber();
if (targetGen > curGen) {
throw new IllegalArgumentException("targetGen=" + targetGen + " was never returned by the ReferenceManager instance (current gen=" + curGen + ")");
}
if (targetGen > searchingGen) { if (targetGen > searchingGen) {
// Notify the reopen thread that the waitingGen has // Notify the reopen thread that the waitingGen has
// changed, so it may wake up and realize it should // changed, so it may wake up and realize it should
@ -240,7 +232,7 @@ public class ControlledRealTimeReopenThread<T> extends Thread implements Closeab
// Save the gen as of when we started the reopen; the // Save the gen as of when we started the reopen; the
// listener (HandleRefresh above) copies this to // listener (HandleRefresh above) copies this to
// searchingGen once the reopen completes: // searchingGen once the reopen completes:
refreshStartGen = writer.getAndIncrementGeneration(); refreshStartGen = writer.getLastSequenceNumber();
try { try {
manager.maybeRefreshBlocking(); manager.maybeRefreshBlocking();
} catch (IOException ioe) { } catch (IOException ioe) {

View File

@ -149,6 +149,11 @@ public abstract class IndexInput extends DataInput implements Cloneable,Closeabl
slice.seek(pos); slice.seek(pos);
return slice.readLong(); return slice.readLong();
} }
@Override
public String toString() {
return "RandomAccessInput(" + IndexInput.this.toString() + ")";
}
}; };
} }
} }

View File

@ -43,8 +43,8 @@ public class TestDocumentsWriterDeleteQueue extends LuceneTestCase {
} }
DeleteSlice slice1 = queue.newSlice(); DeleteSlice slice1 = queue.newSlice();
DeleteSlice slice2 = queue.newSlice(); DeleteSlice slice2 = queue.newSlice();
BufferedUpdates bd1 = new BufferedUpdates(); BufferedUpdates bd1 = new BufferedUpdates("bd1");
BufferedUpdates bd2 = new BufferedUpdates(); BufferedUpdates bd2 = new BufferedUpdates("bd2");
int last1 = 0; int last1 = 0;
int last2 = 0; int last2 = 0;
Set<Term> uniqueValues = new HashSet<>(); Set<Term> uniqueValues = new HashSet<>();
@ -225,7 +225,7 @@ public class TestDocumentsWriterDeleteQueue extends LuceneTestCase {
this.index = index; this.index = index;
this.ids = ids; this.ids = ids;
this.slice = queue.newSlice(); this.slice = queue.newSlice();
deletes = new BufferedUpdates(); deletes = new BufferedUpdates("deletes");
this.latch = latch; this.latch = latch;
} }

View File

@ -93,7 +93,6 @@ public class TestIndexWriterConfig extends LuceneTestCase {
getters.add("getIndexingChain"); getters.add("getIndexingChain");
getters.add("getMergedSegmentWarmer"); getters.add("getMergedSegmentWarmer");
getters.add("getMergePolicy"); getters.add("getMergePolicy");
getters.add("getMaxThreadStates");
getters.add("getReaderPooling"); getters.add("getReaderPooling");
getters.add("getIndexerThreadPool"); getters.add("getIndexerThreadPool");
getters.add("getFlushPolicy"); getters.add("getFlushPolicy");

View File

@ -1238,8 +1238,8 @@ public class TestIndexWriterDelete extends LuceneTestCase {
iwc.setOpenMode(IndexWriterConfig.OpenMode.APPEND); iwc.setOpenMode(IndexWriterConfig.OpenMode.APPEND);
w = new IndexWriter(d, iwc); w = new IndexWriter(d, iwc);
IndexReader r = DirectoryReader.open(w, false, false); IndexReader r = DirectoryReader.open(w, false, false);
assertTrue(w.tryDeleteDocument(r, 1)); assertTrue(w.tryDeleteDocument(r, 1) != -1);
assertTrue(w.tryDeleteDocument(r.leaves().get(0).reader(), 0)); assertTrue(w.tryDeleteDocument(r.leaves().get(0).reader(), 0) != -1);
r.close(); r.close();
w.close(); w.close();

View File

@ -0,0 +1,640 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.lucene.index;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.NumericDocValuesField;
import org.apache.lucene.document.StoredField;
import org.apache.lucene.document.StringField;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.ScoreDoc;
import org.apache.lucene.search.TermQuery;
import org.apache.lucene.search.TopDocs;
import org.apache.lucene.store.Directory;
import org.apache.lucene.util.Bits;
import org.apache.lucene.util.LuceneTestCase.SuppressCodecs;
import org.apache.lucene.util.LuceneTestCase;
import org.apache.lucene.util.TestUtil;
public class TestIndexingSequenceNumbers extends LuceneTestCase {
public void testBasic() throws Exception {
Directory dir = newDirectory();
IndexWriter w = new IndexWriter(dir, newIndexWriterConfig());
long a = w.addDocument(new Document());
long b = w.addDocument(new Document());
assertTrue(b > a);
w.close();
dir.close();
}
public void testAfterRefresh() throws Exception {
Directory dir = newDirectory();
IndexWriter w = new IndexWriter(dir, newIndexWriterConfig());
long a = w.addDocument(new Document());
DirectoryReader.open(w).close();
long b = w.addDocument(new Document());
assertTrue(b > a);
w.close();
dir.close();
}
public void testAfterCommit() throws Exception {
Directory dir = newDirectory();
IndexWriter w = new IndexWriter(dir, newIndexWriterConfig());
long a = w.addDocument(new Document());
w.commit();
long b = w.addDocument(new Document());
assertTrue(b > a);
w.close();
dir.close();
}
public void testStressUpdateSameID() throws Exception {
int iters = atLeast(100);
for(int iter=0;iter<iters;iter++) {
Directory dir = newDirectory();
final RandomIndexWriter w = new RandomIndexWriter(random(), dir);
Thread[] threads = new Thread[TestUtil.nextInt(random(), 2, 5)];
final CountDownLatch startingGun = new CountDownLatch(1);
final long[] seqNos = new long[threads.length];
final Term id = new Term("id", "id");
// multiple threads update the same document
for(int i=0;i<threads.length;i++) {
final int threadID = i;
threads[i] = new Thread() {
@Override
public void run() {
try {
Document doc = new Document();
doc.add(new StoredField("thread", threadID));
doc.add(new StringField("id", "id", Field.Store.NO));
startingGun.await();
for(int j=0;j<100;j++) {
if (random().nextBoolean()) {
seqNos[threadID] = w.updateDocument(id, doc);
} else {
List<Document> docs = new ArrayList<>();
docs.add(doc);
seqNos[threadID] = w.updateDocuments(id, docs);
}
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
};
threads[i].start();
}
startingGun.countDown();
for(Thread thread : threads) {
thread.join();
}
// now confirm that the reported sequence numbers agree with the index:
int maxThread = 0;
Set<Long> allSeqNos = new HashSet<>();
for(int i=0;i<threads.length;i++) {
allSeqNos.add(seqNos[i]);
if (seqNos[i] > seqNos[maxThread]) {
maxThread = i;
}
}
// make sure all sequence numbers were different
assertEquals(threads.length, allSeqNos.size());
DirectoryReader r = w.getReader();
IndexSearcher s = newSearcher(r);
TopDocs hits = s.search(new TermQuery(id), 1);
assertEquals(1, hits.totalHits);
Document doc = r.document(hits.scoreDocs[0].doc);
assertEquals(maxThread, doc.getField("thread").numericValue().intValue());
r.close();
w.close();
dir.close();
}
}
static class Operation {
// 0 = update, 1 = delete, 2 = commit, 3 = add
byte what;
int id;
int threadID;
long seqNo;
}
public void testStressConcurrentCommit() throws Exception {
final int opCount = atLeast(10000);
final int idCount = TestUtil.nextInt(random(), 10, 1000);
Directory dir = newDirectory();
IndexWriterConfig iwc = newIndexWriterConfig();
iwc.setIndexDeletionPolicy(NoDeletionPolicy.INSTANCE);
// Cannot use RIW since it randomly commits:
final IndexWriter w = new IndexWriter(dir, iwc);
final int numThreads = TestUtil.nextInt(random(), 2, 10);
Thread[] threads = new Thread[numThreads];
//System.out.println("TEST: iter=" + iter + " opCount=" + opCount + " idCount=" + idCount + " threadCount=" + threads.length);
final CountDownLatch startingGun = new CountDownLatch(1);
List<List<Operation>> threadOps = new ArrayList<>();
Object commitLock = new Object();
final List<Operation> commits = new ArrayList<>();
// multiple threads update the same set of documents, and we randomly commit, recording the commit seqNo and then opening each commit in
// the end to verify it reflects the correct updates
for(int i=0;i<threads.length;i++) {
final List<Operation> ops = new ArrayList<>();
threadOps.add(ops);
final int threadID = i;
threads[i] = new Thread() {
@Override
public void run() {
try {
startingGun.await();
for(int i=0;i<opCount;i++) {
Operation op = new Operation();
op.threadID = threadID;
if (random().nextInt(500) == 17) {
op.what = 2;
synchronized(commitLock) {
op.seqNo = w.commit();
if (op.seqNo != -1) {
commits.add(op);
}
}
} else {
op.id = random().nextInt(idCount);
Term idTerm = new Term("id", "" + op.id);
if (random().nextInt(10) == 1) {
op.what = 1;
if (random().nextBoolean()) {
op.seqNo = w.deleteDocuments(idTerm);
} else {
op.seqNo = w.deleteDocuments(new TermQuery(idTerm));
}
} else {
Document doc = new Document();
doc.add(new StoredField("thread", threadID));
doc.add(new StringField("id", "" + op.id, Field.Store.NO));
if (random().nextBoolean()) {
List<Document> docs = new ArrayList<>();
docs.add(doc);
op.seqNo = w.updateDocuments(idTerm, docs);
} else {
op.seqNo = w.updateDocument(idTerm, doc);
}
op.what = 0;
}
ops.add(op);
}
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
};
threads[i].start();
}
startingGun.countDown();
for(Thread thread : threads) {
thread.join();
}
Operation commitOp = new Operation();
commitOp.seqNo = w.commit();
if (commitOp.seqNo != -1) {
commits.add(commitOp);
}
List<IndexCommit> indexCommits = DirectoryReader.listCommits(dir);
assertEquals(commits.size(), indexCommits.size());
int[] expectedThreadIDs = new int[idCount];
long[] seqNos = new long[idCount];
//System.out.println("TEST: " + commits.size() + " commits");
for(int i=0;i<commits.size();i++) {
// this commit point should reflect all operations <= this seqNo
long commitSeqNo = commits.get(i).seqNo;
//System.out.println(" commit " + i + ": seqNo=" + commitSeqNo + " segs=" + indexCommits.get(i));
Arrays.fill(expectedThreadIDs, -1);
Arrays.fill(seqNos, 0);
for(int threadID=0;threadID<threadOps.size();threadID++) {
long lastSeqNo = 0;
for(Operation op : threadOps.get(threadID)) {
if (op.seqNo <= commitSeqNo && op.seqNo > seqNos[op.id]) {
seqNos[op.id] = op.seqNo;
if (op.what == 0) {
expectedThreadIDs[op.id] = threadID;
} else {
expectedThreadIDs[op.id] = -1;
}
}
assertTrue(op.seqNo > lastSeqNo);
lastSeqNo = op.seqNo;
}
}
DirectoryReader r = DirectoryReader.open(indexCommits.get(i));
IndexSearcher s = new IndexSearcher(r);
for(int id=0;id<idCount;id++) {
//System.out.println("TEST: check id=" + id + " expectedThreadID=" + expectedThreadIDs[id]);
TopDocs hits = s.search(new TermQuery(new Term("id", ""+id)), 1);
if (expectedThreadIDs[id] != -1) {
assertEquals(1, hits.totalHits);
Document doc = r.document(hits.scoreDocs[0].doc);
int actualThreadID = doc.getField("thread").numericValue().intValue();
if (expectedThreadIDs[id] != actualThreadID) {
System.out.println("FAIL: id=" + id + " expectedThreadID=" + expectedThreadIDs[id] + " vs actualThreadID=" + actualThreadID + " commitSeqNo=" + commitSeqNo + " numThreads=" + numThreads);
for(int threadID=0;threadID<threadOps.size();threadID++) {
for(Operation op : threadOps.get(threadID)) {
if (id == op.id) {
System.out.println(" threadID=" + threadID + " seqNo=" + op.seqNo + " " + (op.what == 2 ? "updated" : "deleted"));
}
}
}
assertEquals("id=" + id, expectedThreadIDs[id], actualThreadID);
}
} else if (hits.totalHits != 0) {
System.out.println("FAIL: id=" + id + " expectedThreadID=" + expectedThreadIDs[id] + " vs totalHits=" + hits.totalHits + " commitSeqNo=" + commitSeqNo + " numThreads=" + numThreads);
for(int threadID=0;threadID<threadOps.size();threadID++) {
for(Operation op : threadOps.get(threadID)) {
if (id == op.id) {
System.out.println(" threadID=" + threadID + " seqNo=" + op.seqNo + " " + (op.what == 2 ? "updated" : "del"));
}
}
}
assertEquals(0, hits.totalHits);
}
}
w.close();
r.close();
}
dir.close();
}
public void testStressConcurrentDocValuesUpdatesCommit() throws Exception {
final int opCount = atLeast(10000);
final int idCount = TestUtil.nextInt(random(), 10, 1000);
Directory dir = newDirectory();
IndexWriterConfig iwc = newIndexWriterConfig();
iwc.setIndexDeletionPolicy(NoDeletionPolicy.INSTANCE);
// Cannot use RIW since it randomly commits:
final IndexWriter w = new IndexWriter(dir, iwc);
final int numThreads = TestUtil.nextInt(random(), 2, 10);
if (VERBOSE) {
System.out.println("TEST: numThreads=" + numThreads);
}
Thread[] threads = new Thread[numThreads];
//System.out.println("TEST: iter=" + iter + " opCount=" + opCount + " idCount=" + idCount + " threadCount=" + threads.length);
final CountDownLatch startingGun = new CountDownLatch(1);
List<List<Operation>> threadOps = new ArrayList<>();
Object commitLock = new Object();
final List<Operation> commits = new ArrayList<>();
List<Operation> ops1 = new ArrayList<>();
threadOps.add(ops1);
// pre-index every ID so none are missing:
for(int id=0;id<idCount;id++) {
int threadID = 0;
Operation op = new Operation();
op.threadID = threadID;
op.id = id;
Document doc = new Document();
doc.add(new StoredField("thread", threadID));
doc.add(new NumericDocValuesField("thread", threadID));
doc.add(new StringField("id", "" + id, Field.Store.NO));
op.seqNo = w.addDocument(doc);
ops1.add(op);
}
// multiple threads update the same set of documents, and we randomly commit, recording the commit seqNo and then opening each commit in
// the end to verify it reflects the correct updates
for(int i=0;i<threads.length;i++) {
final List<Operation> ops;
if (i == 0) {
ops = threadOps.get(0);
} else {
ops = new ArrayList<>();
threadOps.add(ops);
}
final int threadID = i;
threads[i] = new Thread() {
@Override
public void run() {
try {
startingGun.await();
for(int i=0;i<opCount;i++) {
Operation op = new Operation();
op.threadID = threadID;
if (random().nextInt(500) == 17) {
op.what = 2;
synchronized(commitLock) {
op.seqNo = w.commit();
if (op.seqNo != -1) {
commits.add(op);
}
}
} else {
op.id = random().nextInt(idCount);
Term idTerm = new Term("id", "" + op.id);
op.seqNo = w.updateNumericDocValue(idTerm, "thread", threadID);
op.what = 0;
ops.add(op);
}
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
};
threads[i].setName("thread" + i);
threads[i].start();
}
startingGun.countDown();
for(Thread thread : threads) {
thread.join();
}
Operation commitOp = new Operation();
commitOp.seqNo = w.commit();
if (commitOp.seqNo != -1) {
commits.add(commitOp);
}
List<IndexCommit> indexCommits = DirectoryReader.listCommits(dir);
assertEquals(commits.size(), indexCommits.size());
int[] expectedThreadIDs = new int[idCount];
long[] seqNos = new long[idCount];
//System.out.println("TEST: " + commits.size() + " commits");
for(int i=0;i<commits.size();i++) {
// this commit point should reflect all operations <= this seqNo
long commitSeqNo = commits.get(i).seqNo;
//System.out.println(" commit " + i + ": seqNo=" + commitSeqNo + " segs=" + indexCommits.get(i));
Arrays.fill(expectedThreadIDs, -1);
Arrays.fill(seqNos, 0);
for(int threadID=0;threadID<threadOps.size();threadID++) {
long lastSeqNo = 0;
for(Operation op : threadOps.get(threadID)) {
if (op.seqNo <= commitSeqNo && op.seqNo > seqNos[op.id]) {
seqNos[op.id] = op.seqNo;
assert op.what == 0;
expectedThreadIDs[op.id] = threadID;
}
assertTrue(op.seqNo > lastSeqNo);
lastSeqNo = op.seqNo;
}
}
DirectoryReader r = DirectoryReader.open(indexCommits.get(i));
IndexSearcher s = new IndexSearcher(r);
NumericDocValues docValues = MultiDocValues.getNumericValues(r, "thread");
for(int id=0;id<idCount;id++) {
//System.out.println("TEST: check id=" + id + " expectedThreadID=" + expectedThreadIDs[id]);
TopDocs hits = s.search(new TermQuery(new Term("id", ""+id)), 1);
// We pre-add all ids up front:
assert expectedThreadIDs[id] != -1;
assertEquals(1, hits.totalHits);
int actualThreadID = (int) docValues.get(hits.scoreDocs[0].doc);
if (expectedThreadIDs[id] != actualThreadID) {
System.out.println("FAIL: commit=" + i + " (of " + commits.size() + ") id=" + id + " expectedThreadID=" + expectedThreadIDs[id] + " vs actualThreadID=" + actualThreadID + " commitSeqNo=" + commitSeqNo + " numThreads=" + numThreads + " reader=" + r + " commit=" + indexCommits.get(i));
for(int threadID=0;threadID<threadOps.size();threadID++) {
for(Operation op : threadOps.get(threadID)) {
if (id == op.id) {
System.out.println(" threadID=" + threadID + " seqNo=" + op.seqNo);
}
}
}
assertEquals("id=" + id + " docID=" + hits.scoreDocs[0].doc, expectedThreadIDs[id], actualThreadID);
}
}
w.close();
r.close();
}
dir.close();
}
public void testStressConcurrentAddAndDeleteAndCommit() throws Exception {
final int opCount = atLeast(10000);
final int idCount = TestUtil.nextInt(random(), 10, 1000);
Directory dir = newDirectory();
IndexWriterConfig iwc = newIndexWriterConfig();
iwc.setIndexDeletionPolicy(NoDeletionPolicy.INSTANCE);
// Cannot use RIW since it randomly commits:
final IndexWriter w = new IndexWriter(dir, iwc);
final int numThreads = TestUtil.nextInt(random(), 2, 5);
Thread[] threads = new Thread[numThreads];
//System.out.println("TEST: iter=" + iter + " opCount=" + opCount + " idCount=" + idCount + " threadCount=" + threads.length);
final CountDownLatch startingGun = new CountDownLatch(1);
List<List<Operation>> threadOps = new ArrayList<>();
Object commitLock = new Object();
final List<Operation> commits = new ArrayList<>();
// multiple threads update the same set of documents, and we randomly commit
for(int i=0;i<threads.length;i++) {
final List<Operation> ops = new ArrayList<>();
threadOps.add(ops);
final int threadID = i;
threads[i] = new Thread() {
@Override
public void run() {
try {
startingGun.await();
for(int i=0;i<opCount;i++) {
Operation op = new Operation();
op.threadID = threadID;
if (random().nextInt(500) == 17) {
op.what = 2;
synchronized(commitLock) {
op.seqNo = w.commit();
if (op.seqNo != -1) {
commits.add(op);
}
}
} else {
op.id = random().nextInt(idCount);
Term idTerm = new Term("id", "" + op.id);
if (random().nextInt(10) == 1) {
op.what = 1;
if (random().nextBoolean()) {
op.seqNo = w.deleteDocuments(idTerm);
} else {
op.seqNo = w.deleteDocuments(new TermQuery(idTerm));
}
} else {
Document doc = new Document();
doc.add(new StoredField("threadop", threadID + "-" + ops.size()));
doc.add(new StringField("id", "" + op.id, Field.Store.NO));
if (random().nextBoolean()) {
List<Document> docs = new ArrayList<>();
docs.add(doc);
op.seqNo = w.addDocuments(docs);
} else {
op.seqNo = w.addDocument(doc);
}
op.what = 3;
}
ops.add(op);
}
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
};
threads[i].setName("thread" + threadID);
threads[i].start();
}
startingGun.countDown();
for(Thread thread : threads) {
thread.join();
}
Operation commitOp = new Operation();
commitOp.seqNo = w.commit();
if (commitOp.seqNo != -1) {
commits.add(commitOp);
}
List<IndexCommit> indexCommits = DirectoryReader.listCommits(dir);
assertEquals(commits.size(), indexCommits.size());
// how many docs with this id are expected:
int[] expectedCounts = new int[idCount];
long[] lastDelSeqNos = new long[idCount];
//System.out.println("TEST: " + commits.size() + " commits");
for(int i=0;i<commits.size();i++) {
// this commit point should reflect all operations <= this seqNo
long commitSeqNo = commits.get(i).seqNo;
//System.out.println(" commit " + i + ": seqNo=" + commitSeqNo + " segs=" + indexCommits.get(i));
// first find the highest seqNo of the last delete op, for each id, prior to this commit:
Arrays.fill(lastDelSeqNos, -1);
for(int threadID=0;threadID<threadOps.size();threadID++) {
long lastSeqNo = 0;
for(Operation op : threadOps.get(threadID)) {
if (op.what == 1 && op.seqNo <= commitSeqNo && op.seqNo > lastDelSeqNos[op.id]) {
lastDelSeqNos[op.id] = op.seqNo;
}
// within one thread the seqNos must only increase:
assertTrue(op.seqNo > lastSeqNo);
lastSeqNo = op.seqNo;
}
}
// then count how many adds happened since the last delete and before this commit:
Arrays.fill(expectedCounts, 0);
for(int threadID=0;threadID<threadOps.size();threadID++) {
for(Operation op : threadOps.get(threadID)) {
if (op.what == 3 && op.seqNo <= commitSeqNo && op.seqNo > lastDelSeqNos[op.id]) {
expectedCounts[op.id]++;
}
}
}
DirectoryReader r = DirectoryReader.open(indexCommits.get(i));
IndexSearcher s = new IndexSearcher(r);
for(int id=0;id<idCount;id++) {
//System.out.println("TEST: check id=" + id + " expectedThreadID=" + expectedThreadIDs[id]);
int actualCount = s.count(new TermQuery(new Term("id", ""+id)));
if (expectedCounts[id] != actualCount) {
System.out.println("TEST: FAIL r=" + r + " id=" + id + " commitSeqNo=" + commitSeqNo);
for(int threadID=0;threadID<threadOps.size();threadID++) {
int opCount2 = 0;
for(Operation op : threadOps.get(threadID)) {
if (op.id == id) {
boolean shouldCount = op.seqNo <= commitSeqNo && op.seqNo > lastDelSeqNos[op.id];
System.out.println(" id=" + id + " what=" + op.what + " threadop=" + threadID + "-" + opCount2 + " seqNo=" + op.seqNo + " vs lastDelSeqNo=" + lastDelSeqNos[op.id] + " shouldCount=" + shouldCount);
}
opCount2++;
}
}
TopDocs hits = s.search(new TermQuery(new Term("id", ""+id)), 1+actualCount);
for(ScoreDoc hit : hits.scoreDocs) {
System.out.println(" hit: " + s.doc(hit.doc).get("threadop"));
}
for(LeafReaderContext ctx : r.leaves()) {
System.out.println(" sub=" + ctx.reader());
Bits liveDocs = ctx.reader().getLiveDocs();
for(int docID=0;docID<ctx.reader().maxDoc();docID++) {
System.out.println(" docID=" + docID + " threadop=" + ctx.reader().document(docID).get("threadop") + (liveDocs != null && liveDocs.get(docID) == false ? " (deleted)" : ""));
}
}
assertEquals("commit " + i + " of " + commits.size() + " id=" + id + " reader=" + r, expectedCounts[id], actualCount);
}
}
w.close();
r.close();
}
dir.close();
}
public void testDeleteAll() throws Exception {
Directory dir = newDirectory();
IndexWriter w = new IndexWriter(dir, newIndexWriterConfig());
long a = w.addDocument(new Document());
long b = w.deleteAll();
assertTrue(a < b);
long c = w.commit();
assertTrue(b < c);
w.close();
dir.close();
}
}

View File

@ -80,7 +80,7 @@ public class TestRollingUpdates extends LuceneTestCase {
if (s != null && updateCount < SIZE) { if (s != null && updateCount < SIZE) {
TopDocs hits = s.search(new TermQuery(idTerm), 1); TopDocs hits = s.search(new TermQuery(idTerm), 1);
assertEquals(1, hits.totalHits); assertEquals(1, hits.totalHits);
doUpdate = !w.tryDeleteDocument(r, hits.scoreDocs[0].doc); doUpdate = w.tryDeleteDocument(r, hits.scoreDocs[0].doc) == -1;
if (VERBOSE) { if (VERBOSE) {
if (doUpdate) { if (doUpdate) {
System.out.println(" tryDeleteDocument failed"); System.out.println(" tryDeleteDocument failed");

View File

@ -79,8 +79,6 @@ public class TestTryDelete extends LuceneTestCase
ReferenceManager<IndexSearcher> mgr = new SearcherManager(writer, ReferenceManager<IndexSearcher> mgr = new SearcherManager(writer,
new SearcherFactory()); new SearcherFactory());
TrackingIndexWriter mgrWriter = new TrackingIndexWriter(writer);
IndexSearcher searcher = mgr.acquire(); IndexSearcher searcher = mgr.acquire();
TopDocs topDocs = searcher.search(new TermQuery(new Term("foo", "0")), TopDocs topDocs = searcher.search(new TermQuery(new Term("foo", "0")),
@ -90,10 +88,10 @@ public class TestTryDelete extends LuceneTestCase
long result; long result;
if (random().nextBoolean()) { if (random().nextBoolean()) {
IndexReader r = DirectoryReader.open(writer); IndexReader r = DirectoryReader.open(writer);
result = mgrWriter.tryDeleteDocument(r, 0); result = writer.tryDeleteDocument(r, 0);
r.close(); r.close();
} else { } else {
result = mgrWriter.tryDeleteDocument(searcher.getIndexReader(), 0); result = writer.tryDeleteDocument(searcher.getIndexReader(), 0);
} }
// The tryDeleteDocument should have succeeded: // The tryDeleteDocument should have succeeded:
@ -132,10 +130,9 @@ public class TestTryDelete extends LuceneTestCase
100); 100);
assertEquals(1, topDocs.totalHits); assertEquals(1, topDocs.totalHits);
TrackingIndexWriter mgrWriter = new TrackingIndexWriter(writer); long result = writer.tryDeleteDocument(DirectoryReader.open(writer), 0);
long result = mgrWriter.tryDeleteDocument(DirectoryReader.open(writer), 0);
assertEquals(1, result); assertTrue(result != -1);
writer.commit(); writer.commit();
@ -175,11 +172,9 @@ public class TestTryDelete extends LuceneTestCase
100); 100);
assertEquals(1, topDocs.totalHits); assertEquals(1, topDocs.totalHits);
TrackingIndexWriter mgrWriter = new TrackingIndexWriter(writer); long result = writer.deleteDocuments(new TermQuery(new Term("foo", "0")));
long result = mgrWriter.deleteDocuments(new TermQuery(new Term("foo",
"0")));
assertEquals(1, result); assertTrue(result != -1);
// writer.commit(); // writer.commit();

View File

@ -40,29 +40,31 @@ public class TestTwoPhaseCommitTool extends LuceneTestCase {
} }
@Override @Override
public void prepareCommit() throws IOException { public long prepareCommit() throws IOException {
prepareCommit(null); return prepareCommit(null);
} }
public void prepareCommit(Map<String, String> commitData) throws IOException { public long prepareCommit(Map<String, String> commitData) throws IOException {
this.prepareCommitData = commitData; this.prepareCommitData = commitData;
assertFalse("commit should not have been called before all prepareCommit were", commitCalled); assertFalse("commit should not have been called before all prepareCommit were", commitCalled);
if (failOnPrepare) { if (failOnPrepare) {
throw new IOException("failOnPrepare"); throw new IOException("failOnPrepare");
} }
return 1;
} }
@Override @Override
public void commit() throws IOException { public long commit() throws IOException {
commit(null); return commit(null);
} }
public void commit(Map<String, String> commitData) throws IOException { public long commit(Map<String, String> commitData) throws IOException {
this.commitData = commitData; this.commitData = commitData;
commitCalled = true; commitCalled = true;
if (failOnCommit) { if (failOnCommit) {
throw new RuntimeException("failOnCommit"); throw new RuntimeException("failOnCommit");
} }
return 1;
} }
@Override @Override

View File

@ -40,7 +40,6 @@ import org.apache.lucene.index.RandomIndexWriter;
import org.apache.lucene.index.SnapshotDeletionPolicy; import org.apache.lucene.index.SnapshotDeletionPolicy;
import org.apache.lucene.index.Term; import org.apache.lucene.index.Term;
import org.apache.lucene.index.ThreadedIndexingAndSearchingTestCase; import org.apache.lucene.index.ThreadedIndexingAndSearchingTestCase;
import org.apache.lucene.index.TrackingIndexWriter;
import org.apache.lucene.store.Directory; import org.apache.lucene.store.Directory;
import org.apache.lucene.store.NRTCachingDirectory; import org.apache.lucene.store.NRTCachingDirectory;
import org.apache.lucene.util.IOUtils; import org.apache.lucene.util.IOUtils;
@ -57,7 +56,7 @@ public class TestControlledRealTimeReopenThread extends ThreadedIndexingAndSearc
// Is guaranteed to reflect deletes: // Is guaranteed to reflect deletes:
private SearcherManager nrtDeletes; private SearcherManager nrtDeletes;
private TrackingIndexWriter genWriter; private IndexWriter genWriter;
private ControlledRealTimeReopenThread<IndexSearcher> nrtDeletesThread; private ControlledRealTimeReopenThread<IndexSearcher> nrtDeletesThread;
private ControlledRealTimeReopenThread<IndexSearcher> nrtNoDeletesThread; private ControlledRealTimeReopenThread<IndexSearcher> nrtNoDeletesThread;
@ -219,7 +218,7 @@ public class TestControlledRealTimeReopenThread extends ThreadedIndexingAndSearc
System.out.println("TEST: make SearcherManager maxReopenSec=" + maxReopenSec + " minReopenSec=" + minReopenSec); System.out.println("TEST: make SearcherManager maxReopenSec=" + maxReopenSec + " minReopenSec=" + minReopenSec);
} }
genWriter = new TrackingIndexWriter(writer); genWriter = writer;
final SearcherFactory sf = new SearcherFactory() { final SearcherFactory sf = new SearcherFactory() {
@Override @Override
@ -311,9 +310,8 @@ public class TestControlledRealTimeReopenThread extends ThreadedIndexingAndSearc
final CountDownLatch latch = new CountDownLatch(1); final CountDownLatch latch = new CountDownLatch(1);
final CountDownLatch signal = new CountDownLatch(1); final CountDownLatch signal = new CountDownLatch(1);
LatchedIndexWriter _writer = new LatchedIndexWriter(d, conf, latch, signal); LatchedIndexWriter writer = new LatchedIndexWriter(d, conf, latch, signal);
final TrackingIndexWriter writer = new TrackingIndexWriter(_writer); final SearcherManager manager = new SearcherManager(writer, false, false, null);
final SearcherManager manager = new SearcherManager(_writer, false, false, null);
Document doc = new Document(); Document doc = new Document();
doc.add(newTextField("test", "test", Field.Store.YES)); doc.add(newTextField("test", "test", Field.Store.YES));
writer.addDocument(doc); writer.addDocument(doc);
@ -334,7 +332,7 @@ public class TestControlledRealTimeReopenThread extends ThreadedIndexingAndSearc
} }
}; };
t.start(); t.start();
_writer.waitAfterUpdate = true; // wait in addDocument to let some reopens go through writer.waitAfterUpdate = true; // wait in addDocument to let some reopens go through
final long lastGen = writer.updateDocument(new Term("foo", "bar"), doc); // once this returns the doc is already reflected in the last reopen final long lastGen = writer.updateDocument(new Term("foo", "bar"), doc); // once this returns the doc is already reflected in the last reopen
assertFalse(manager.isSearcherCurrent()); // false since there is a delete in the queue assertFalse(manager.isSearcherCurrent()); // false since there is a delete in the queue
@ -373,7 +371,7 @@ public class TestControlledRealTimeReopenThread extends ThreadedIndexingAndSearc
} }
thread.close(); thread.close();
thread.join(); thread.join();
_writer.close(); writer.close();
IOUtils.close(manager, d); IOUtils.close(manager, d);
} }
@ -389,14 +387,13 @@ public class TestControlledRealTimeReopenThread extends ThreadedIndexingAndSearc
super(d, conf); super(d, conf);
this.latch = latch; this.latch = latch;
this.signal = signal; this.signal = signal;
} }
@Override @Override
public void updateDocument(Term term, public long updateDocument(Term term,
Iterable<? extends IndexableField> doc) Iterable<? extends IndexableField> doc)
throws IOException { throws IOException {
super.updateDocument(term, doc); long result = super.updateDocument(term, doc);
try { try {
if (waitAfterUpdate) { if (waitAfterUpdate) {
signal.countDown(); signal.countDown();
@ -405,6 +402,7 @@ public class TestControlledRealTimeReopenThread extends ThreadedIndexingAndSearc
} catch (InterruptedException e) { } catch (InterruptedException e) {
throw new ThreadInterruptedException(e); throw new ThreadInterruptedException(e);
} }
return result;
} }
} }
@ -483,9 +481,8 @@ public class TestControlledRealTimeReopenThread extends ThreadedIndexingAndSearc
config.setOpenMode(IndexWriterConfig.OpenMode.CREATE_OR_APPEND); config.setOpenMode(IndexWriterConfig.OpenMode.CREATE_OR_APPEND);
final IndexWriter iw = new IndexWriter(dir, config); final IndexWriter iw = new IndexWriter(dir, config);
SearcherManager sm = new SearcherManager(iw, new SearcherFactory()); SearcherManager sm = new SearcherManager(iw, new SearcherFactory());
final TrackingIndexWriter tiw = new TrackingIndexWriter(iw);
ControlledRealTimeReopenThread<IndexSearcher> controlledRealTimeReopenThread = ControlledRealTimeReopenThread<IndexSearcher> controlledRealTimeReopenThread =
new ControlledRealTimeReopenThread<>(tiw, sm, maxStaleSecs, 0); new ControlledRealTimeReopenThread<>(iw, sm, maxStaleSecs, 0);
controlledRealTimeReopenThread.setDaemon(true); controlledRealTimeReopenThread.setDaemon(true);
controlledRealTimeReopenThread.start(); controlledRealTimeReopenThread.start();
@ -517,7 +514,7 @@ public class TestControlledRealTimeReopenThread extends ThreadedIndexingAndSearc
d.add(new TextField("count", i + "", Field.Store.NO)); d.add(new TextField("count", i + "", Field.Store.NO));
d.add(new TextField("content", content, Field.Store.YES)); d.add(new TextField("content", content, Field.Store.YES));
long start = System.currentTimeMillis(); long start = System.currentTimeMillis();
long l = tiw.addDocument(d); long l = iw.addDocument(d);
controlledRealTimeReopenThread.waitForGeneration(l); controlledRealTimeReopenThread.waitForGeneration(l);
long wait = System.currentTimeMillis() - start; long wait = System.currentTimeMillis() - start;
assertTrue("waited too long for generation " + wait, assertTrue("waited too long for generation " + wait,

View File

@ -581,14 +581,14 @@ public class DirectoryTaxonomyWriter implements TaxonomyWriter {
} }
@Override @Override
public synchronized void commit() throws IOException { public synchronized long commit() throws IOException {
ensureOpen(); ensureOpen();
// LUCENE-4972: if we always call setCommitData, we create empty commits // LUCENE-4972: if we always call setCommitData, we create empty commits
String epochStr = indexWriter.getCommitData().get(INDEX_EPOCH); String epochStr = indexWriter.getCommitData().get(INDEX_EPOCH);
if (epochStr == null || Long.parseLong(epochStr, 16) != indexEpoch) { if (epochStr == null || Long.parseLong(epochStr, 16) != indexEpoch) {
indexWriter.setCommitData(combinedCommitData(indexWriter.getCommitData())); indexWriter.setCommitData(combinedCommitData(indexWriter.getCommitData()));
} }
indexWriter.commit(); return indexWriter.commit();
} }
/** Combine original user data with the taxonomy epoch. */ /** Combine original user data with the taxonomy epoch. */
@ -616,14 +616,14 @@ public class DirectoryTaxonomyWriter implements TaxonomyWriter {
* See {@link IndexWriter#prepareCommit}. * See {@link IndexWriter#prepareCommit}.
*/ */
@Override @Override
public synchronized void prepareCommit() throws IOException { public synchronized long prepareCommit() throws IOException {
ensureOpen(); ensureOpen();
// LUCENE-4972: if we always call setCommitData, we create empty commits // LUCENE-4972: if we always call setCommitData, we create empty commits
String epochStr = indexWriter.getCommitData().get(INDEX_EPOCH); String epochStr = indexWriter.getCommitData().get(INDEX_EPOCH);
if (epochStr == null || Long.parseLong(epochStr, 16) != indexEpoch) { if (epochStr == null || Long.parseLong(epochStr, 16) != indexEpoch) {
indexWriter.setCommitData(combinedCommitData(indexWriter.getCommitData())); indexWriter.setCommitData(combinedCommitData(indexWriter.getCommitData()));
} }
indexWriter.prepareCommit(); return indexWriter.prepareCommit();
} }
@Override @Override

View File

@ -130,14 +130,15 @@ public class RandomIndexWriter implements Closeable {
* Adds a Document. * Adds a Document.
* @see IndexWriter#addDocument(Iterable) * @see IndexWriter#addDocument(Iterable)
*/ */
public <T extends IndexableField> void addDocument(final Iterable<T> doc) throws IOException { public <T extends IndexableField> long addDocument(final Iterable<T> doc) throws IOException {
LuceneTestCase.maybeChangeLiveIndexWriterConfig(r, w.getConfig()); LuceneTestCase.maybeChangeLiveIndexWriterConfig(r, w.getConfig());
long seqNo;
if (r.nextInt(5) == 3) { if (r.nextInt(5) == 3) {
// TODO: maybe, we should simply buffer up added docs // TODO: maybe, we should simply buffer up added docs
// (but we need to clone them), and only when // (but we need to clone them), and only when
// getReader, commit, etc. are called, we do an // getReader, commit, etc. are called, we do an
// addDocuments? Would be better testing. // addDocuments? Would be better testing.
w.addDocuments(new Iterable<Iterable<T>>() { seqNo = w.addDocuments(new Iterable<Iterable<T>>() {
@Override @Override
public Iterator<Iterable<T>> iterator() { public Iterator<Iterable<T>> iterator() {
@ -167,10 +168,12 @@ public class RandomIndexWriter implements Closeable {
} }
}); });
} else { } else {
w.addDocument(doc); seqNo = w.addDocument(doc);
} }
maybeFlushOrCommit(); maybeFlushOrCommit();
return seqNo;
} }
private void maybeFlushOrCommit() throws IOException { private void maybeFlushOrCommit() throws IOException {
@ -195,26 +198,29 @@ public class RandomIndexWriter implements Closeable {
} }
} }
public void addDocuments(Iterable<? extends Iterable<? extends IndexableField>> docs) throws IOException { public long addDocuments(Iterable<? extends Iterable<? extends IndexableField>> docs) throws IOException {
LuceneTestCase.maybeChangeLiveIndexWriterConfig(r, w.getConfig()); LuceneTestCase.maybeChangeLiveIndexWriterConfig(r, w.getConfig());
w.addDocuments(docs); long seqNo = w.addDocuments(docs);
maybeFlushOrCommit(); maybeFlushOrCommit();
return seqNo;
} }
public void updateDocuments(Term delTerm, Iterable<? extends Iterable<? extends IndexableField>> docs) throws IOException { public long updateDocuments(Term delTerm, Iterable<? extends Iterable<? extends IndexableField>> docs) throws IOException {
LuceneTestCase.maybeChangeLiveIndexWriterConfig(r, w.getConfig()); LuceneTestCase.maybeChangeLiveIndexWriterConfig(r, w.getConfig());
w.updateDocuments(delTerm, docs); long seqNo = w.updateDocuments(delTerm, docs);
maybeFlushOrCommit(); maybeFlushOrCommit();
return seqNo;
} }
/** /**
* Updates a document. * Updates a document.
* @see IndexWriter#updateDocument(Term, Iterable) * @see IndexWriter#updateDocument(Term, Iterable)
*/ */
public <T extends IndexableField> void updateDocument(Term t, final Iterable<T> doc) throws IOException { public <T extends IndexableField> long updateDocument(Term t, final Iterable<T> doc) throws IOException {
LuceneTestCase.maybeChangeLiveIndexWriterConfig(r, w.getConfig()); LuceneTestCase.maybeChangeLiveIndexWriterConfig(r, w.getConfig());
long seqNo;
if (r.nextInt(5) == 3) { if (r.nextInt(5) == 3) {
w.updateDocuments(t, new Iterable<Iterable<T>>() { seqNo = w.updateDocuments(t, new Iterable<Iterable<T>>() {
@Override @Override
public Iterator<Iterable<T>> iterator() { public Iterator<Iterable<T>> iterator() {
@ -243,49 +249,51 @@ public class RandomIndexWriter implements Closeable {
} }
}); });
} else { } else {
w.updateDocument(t, doc); seqNo = w.updateDocument(t, doc);
} }
maybeFlushOrCommit(); maybeFlushOrCommit();
return seqNo;
} }
public void addIndexes(Directory... dirs) throws IOException { public long addIndexes(Directory... dirs) throws IOException {
LuceneTestCase.maybeChangeLiveIndexWriterConfig(r, w.getConfig()); LuceneTestCase.maybeChangeLiveIndexWriterConfig(r, w.getConfig());
w.addIndexes(dirs); return w.addIndexes(dirs);
} }
public void addIndexes(CodecReader... readers) throws IOException { public long addIndexes(CodecReader... readers) throws IOException {
LuceneTestCase.maybeChangeLiveIndexWriterConfig(r, w.getConfig()); LuceneTestCase.maybeChangeLiveIndexWriterConfig(r, w.getConfig());
w.addIndexes(readers); return w.addIndexes(readers);
} }
public void updateNumericDocValue(Term term, String field, Long value) throws IOException { public long updateNumericDocValue(Term term, String field, Long value) throws IOException {
LuceneTestCase.maybeChangeLiveIndexWriterConfig(r, w.getConfig()); LuceneTestCase.maybeChangeLiveIndexWriterConfig(r, w.getConfig());
w.updateNumericDocValue(term, field, value); return w.updateNumericDocValue(term, field, value);
} }
public void updateBinaryDocValue(Term term, String field, BytesRef value) throws IOException { public long updateBinaryDocValue(Term term, String field, BytesRef value) throws IOException {
LuceneTestCase.maybeChangeLiveIndexWriterConfig(r, w.getConfig()); LuceneTestCase.maybeChangeLiveIndexWriterConfig(r, w.getConfig());
w.updateBinaryDocValue(term, field, value); return w.updateBinaryDocValue(term, field, value);
} }
public void updateDocValues(Term term, Field... updates) throws IOException { public long updateDocValues(Term term, Field... updates) throws IOException {
LuceneTestCase.maybeChangeLiveIndexWriterConfig(r, w.getConfig()); LuceneTestCase.maybeChangeLiveIndexWriterConfig(r, w.getConfig());
w.updateDocValues(term, updates); return w.updateDocValues(term, updates);
} }
public void deleteDocuments(Term term) throws IOException { public long deleteDocuments(Term term) throws IOException {
LuceneTestCase.maybeChangeLiveIndexWriterConfig(r, w.getConfig()); LuceneTestCase.maybeChangeLiveIndexWriterConfig(r, w.getConfig());
w.deleteDocuments(term); return w.deleteDocuments(term);
} }
public void deleteDocuments(Query q) throws IOException { public long deleteDocuments(Query q) throws IOException {
LuceneTestCase.maybeChangeLiveIndexWriterConfig(r, w.getConfig()); LuceneTestCase.maybeChangeLiveIndexWriterConfig(r, w.getConfig());
w.deleteDocuments(q); return w.deleteDocuments(q);
} }
public void commit() throws IOException { public long commit() throws IOException {
LuceneTestCase.maybeChangeLiveIndexWriterConfig(r, w.getConfig()); LuceneTestCase.maybeChangeLiveIndexWriterConfig(r, w.getConfig());
w.commit(); return w.commit();
} }
public int numDocs() { public int numDocs() {
@ -296,8 +304,8 @@ public class RandomIndexWriter implements Closeable {
return w.maxDoc(); return w.maxDoc();
} }
public void deleteAll() throws IOException { public long deleteAll() throws IOException {
w.deleteAll(); return w.deleteAll();
} }
public DirectoryReader getReader() throws IOException { public DirectoryReader getReader() throws IOException {