LUCENE-8232: Separate out PendingDeletes from ReadersAndUpdates

Today ReadersAndUpdates is tightly coupled with IW and all the
handling of pending deletes. This change decouples IW and pending
deletes from ReadersAndUpdates and makes PendingDeletes unittestable.
This commit is contained in:
Simon Willnauer 2018-03-29 17:21:59 +02:00
parent ab092942cf
commit acb3c37942
6 changed files with 404 additions and 154 deletions

View File

@ -333,8 +333,8 @@ class BufferedUpdatesStream implements Accountable {
if (success) { if (success) {
totDelCount += segState.rld.getPendingDeleteCount() - segState.startDelCount; totDelCount += segState.rld.getPendingDeleteCount() - segState.startDelCount;
int fullDelCount = segState.rld.info.getDelCount() + segState.rld.getPendingDeleteCount(); int fullDelCount = segState.rld.info.getDelCount() + segState.rld.getPendingDeleteCount();
assert fullDelCount <= segState.rld.info.info.maxDoc(); assert fullDelCount <= segState.rld.info.info.maxDoc() : fullDelCount + " > " + segState.rld.info.info.maxDoc();
if (fullDelCount == segState.rld.info.info.maxDoc()) { if (segState.rld.isFullyDeleted()) {
if (allDeleted == null) { if (allDeleted == null) {
allDeleted = new ArrayList<>(); allDeleted = new ArrayList<>();
} }

View File

@ -604,7 +604,10 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
if (!poolReaders && rld.refCount() == 1 && readerMap.containsKey(rld.info)) { if (!poolReaders && rld.refCount() == 1 && readerMap.containsKey(rld.info)) {
// This is the last ref to this RLD, and we're not // This is the last ref to this RLD, and we're not
// pooling, so remove it: // pooling, so remove it:
if (rld.writeLiveDocs(directory)) { boolean changed = rld.writeLiveDocs(directory);
changed |= rld.writeFieldUpdates(directory, globalFieldNumberMap, bufferedUpdatesStream.getCompletedDelGen(), infoStream);
if (changed) {
// Make sure we only write del docs for a live segment: // Make sure we only write del docs for a live segment:
assert assertInfoLive == false || assertInfoIsLive(rld.info); assert assertInfoLive == false || assertInfoIsLive(rld.info);
// Must checkpoint because we just // Must checkpoint because we just
@ -616,9 +619,6 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
// did was move the state to disk: // did was move the state to disk:
checkpointNoSIS(); checkpointNoSIS();
} }
rld.writeFieldUpdates(directory, bufferedUpdatesStream.getCompletedDelGen(), infoStream);
if (rld.getNumDVUpdates() == 0) { if (rld.getNumDVUpdates() == 0) {
rld.dropReaders(); rld.dropReaders();
readerMap.remove(rld.info); readerMap.remove(rld.info);
@ -635,13 +635,15 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
} }
void writeAllDocValuesUpdates() throws IOException { void writeAllDocValuesUpdates() throws IOException {
assert Thread.holdsLock(IndexWriter.this);
Collection<ReadersAndUpdates> copy; Collection<ReadersAndUpdates> copy;
synchronized (this) { synchronized (this) {
// this needs to be protected by the reader pool lock otherwise we hit ConcurrentModificationException
copy = new HashSet<>(readerMap.values()); copy = new HashSet<>(readerMap.values());
} }
boolean any = false; boolean any = false;
for (ReadersAndUpdates rld : copy) { for (ReadersAndUpdates rld : copy) {
any |= rld.writeFieldUpdates(directory, bufferedUpdatesStream.getCompletedDelGen(), infoStream); any |= rld.writeFieldUpdates(directory, globalFieldNumberMap, bufferedUpdatesStream.getCompletedDelGen(), infoStream);
} }
if (any) { if (any) {
checkpoint(); checkpoint();
@ -649,11 +651,12 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
} }
void writeDocValuesUpdatesForMerge(List<SegmentCommitInfo> infos) throws IOException { void writeDocValuesUpdatesForMerge(List<SegmentCommitInfo> infos) throws IOException {
assert Thread.holdsLock(IndexWriter.this);
boolean any = false; boolean any = false;
for (SegmentCommitInfo info : infos) { for (SegmentCommitInfo info : infos) {
ReadersAndUpdates rld = get(info, false); ReadersAndUpdates rld = get(info, false);
if (rld != null) { if (rld != null) {
any |= rld.writeFieldUpdates(directory, bufferedUpdatesStream.getCompletedDelGen(), infoStream); any |= rld.writeFieldUpdates(directory, globalFieldNumberMap, bufferedUpdatesStream.getCompletedDelGen(), infoStream);
rld.setIsMerging(); rld.setIsMerging();
} }
} }
@ -706,7 +709,9 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
// Only acquire IW lock on each write, since this is a time consuming operation. This way // Only acquire IW lock on each write, since this is a time consuming operation. This way
// other threads get a chance to run in between our writes. // other threads get a chance to run in between our writes.
synchronized (IndexWriter.this) { synchronized (IndexWriter.this) {
rld.writeFieldUpdates(directory, bufferedUpdatesStream.getCompletedDelGen(), infoStream); if (rld.writeFieldUpdates(directory, globalFieldNumberMap, bufferedUpdatesStream.getCompletedDelGen(), infoStream)) {
checkpointNoSIS();
}
} }
long bytesUsedAfter = rld.ramBytesUsed.get(); long bytesUsedAfter = rld.ramBytesUsed.get();
ramBytesUsed -= bytesUsedBefore - bytesUsedAfter; ramBytesUsed -= bytesUsedBefore - bytesUsedAfter;
@ -789,8 +794,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
if (rld != null) { if (rld != null) {
assert rld.info == info; assert rld.info == info;
boolean changed = rld.writeLiveDocs(directory); boolean changed = rld.writeLiveDocs(directory);
changed |= rld.writeFieldUpdates(directory, globalFieldNumberMap, bufferedUpdatesStream.getCompletedDelGen(), infoStream);
changed |= rld.writeFieldUpdates(directory, bufferedUpdatesStream.getCompletedDelGen(), infoStream);
if (changed) { if (changed) {
// Make sure we only write del docs for a live segment: // Make sure we only write del docs for a live segment:
@ -838,7 +842,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
if (create == false) { if (create == false) {
return null; return null;
} }
rld = new ReadersAndUpdates(IndexWriter.this, info); rld = new ReadersAndUpdates(segmentInfos.getIndexCreatedVersionMajor(), info, null, new PendingDeletes(null, info));
// Steal initial reference: // Steal initial reference:
readerMap.put(info, rld); readerMap.put(info, rld);
} else { } else {
@ -1147,7 +1151,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
LeafReaderContext leaf = leaves.get(i); LeafReaderContext leaf = leaves.get(i);
SegmentReader segReader = (SegmentReader) leaf.reader(); SegmentReader segReader = (SegmentReader) leaf.reader();
SegmentReader newReader = new SegmentReader(segmentInfos.info(i), segReader, segReader.getLiveDocs(), segReader.numDocs()); SegmentReader newReader = new SegmentReader(segmentInfos.info(i), segReader, segReader.getLiveDocs(), segReader.numDocs());
readerPool.readerMap.put(newReader.getSegmentInfo(), new ReadersAndUpdates(this, newReader)); readerPool.readerMap.put(newReader.getSegmentInfo(), new ReadersAndUpdates(segmentInfos.getIndexCreatedVersionMajor(), newReader, new PendingDeletes(newReader, newReader.getSegmentInfo())));
} }
// We always assume we are carrying over incoming changes when opening from reader: // We always assume we are carrying over incoming changes when opening from reader:
@ -1637,8 +1641,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
if (rld != null) { if (rld != null) {
synchronized(bufferedUpdatesStream) { synchronized(bufferedUpdatesStream) {
if (rld.delete(docID)) { if (rld.delete(docID)) {
final int fullDelCount = rld.info.getDelCount() + rld.getPendingDeleteCount(); if (rld.isFullyDeleted()) {
if (fullDelCount == rld.info.info.maxDoc()) {
dropDeletedSegment(rld.info); dropDeletedSegment(rld.info);
checkpoint(); checkpoint();
} }
@ -4000,8 +4003,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
final boolean allDeleted = merge.segments.size() == 0 || final boolean allDeleted = merge.segments.size() == 0 ||
merge.info.info.maxDoc() == 0 || merge.info.info.maxDoc() == 0 ||
(mergedUpdates != null && (mergedUpdates != null && mergedUpdates.isFullyDeleted());
mergedUpdates.getPendingDeleteCount() == merge.info.info.maxDoc());
if (infoStream.isEnabled("IW")) { if (infoStream.isEnabled("IW")) {
if (allDeleted) { if (allDeleted) {

View File

@ -0,0 +1,193 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.lucene.index;
import java.io.IOException;
import org.apache.lucene.codecs.Codec;
import org.apache.lucene.codecs.LiveDocsFormat;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.TrackingDirectoryWrapper;
import org.apache.lucene.util.Bits;
import org.apache.lucene.util.IOUtils;
import org.apache.lucene.util.MutableBits;
/**
* This class handles accounting and applying pending deletes for live segment readers
*/
final class PendingDeletes {
private final SegmentCommitInfo info;
// True if the current liveDocs is referenced by an
// external NRT reader:
private boolean liveDocsShared;
// Holds the current shared (readable and writable)
// liveDocs. This is null when there are no deleted
// docs, and it's copy-on-write (cloned whenever we need
// to change it but it's been shared to an external NRT
// reader).
private Bits liveDocs;
private int pendingDeleteCount;
PendingDeletes(SegmentReader reader, SegmentCommitInfo info) {
this.info = info;
liveDocsShared = true;
liveDocs = reader != null ? reader.getLiveDocs() : null;
if (reader != null) {
pendingDeleteCount = reader.numDeletedDocs() - info.getDelCount();
} else {
pendingDeleteCount = 0;
}
}
/**
* Marks a document as deleted in this segment and return true if a document got actually deleted or
* if the document was already deleted.
*/
boolean delete(int docID) throws IOException {
assert info.info.maxDoc() > 0;
if (liveDocsShared) {
// Copy on write: this means we've cloned a
// SegmentReader sharing the current liveDocs
// instance; must now make a private clone so we can
// change it:
LiveDocsFormat liveDocsFormat = info.info.getCodec().liveDocsFormat();
if (liveDocs == null) {
liveDocs = liveDocsFormat.newLiveDocs(info.info.maxDoc());
} else {
liveDocs = liveDocsFormat.newLiveDocs(liveDocs);
}
liveDocsShared = false;
}
assert liveDocs != null;
assert docID >= 0 && docID < liveDocs.length() : "out of bounds: docid=" + docID + " liveDocsLength=" + liveDocs.length() + " seg=" + info.info.name + " maxDoc=" + info.info.maxDoc();
assert !liveDocsShared;
final boolean didDelete = liveDocs.get(docID);
if (didDelete) {
((MutableBits) liveDocs).clear(docID);
pendingDeleteCount++;
}
return didDelete;
}
/**
* Should be called if the live docs returned from {@link #getLiveDocs()} are shared outside of the
* {@link ReadersAndUpdates}
*/
void liveDocsShared() {
liveDocsShared = true;
}
/**
* Returns the current live docs or null if all docs are live. The returned instance might be mutable or is mutated behind the scenes.
* If the returned live docs are shared outside of the ReadersAndUpdates {@link #liveDocsShared()} should be called
* first.
*/
Bits getLiveDocs() {
return liveDocs;
}
/**
* Returns the number of pending deletes that are not written to disk.
*/
int numPendingDeletes() {
return pendingDeleteCount;
}
/**
* Called once a new reader is opened for this segment ie. when deletes or updates are applied.
*/
void onNewReader(SegmentReader reader, SegmentCommitInfo info) {
if (liveDocs == null) {
liveDocs = reader.getLiveDocs();
}
}
/**
* Resets the pending docs
*/
void reset() {
pendingDeleteCount = 0;
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("PendingDeletes(seg=").append(info);
sb.append(" numPendingDeletes=").append(pendingDeleteCount);
sb.append(" liveDocsShared=").append(liveDocsShared);
return sb.toString();
}
/**
* Writes the live docs to disk and returns <code>true</code> if any new docs were written.
*/
boolean writeLiveDocs(Directory dir) throws IOException {
if (pendingDeleteCount == 0) {
return false;
}
Bits liveDocs = this.liveDocs;
assert liveDocs != null;
// We have new deletes
assert liveDocs.length() == info.info.maxDoc();
// Do this so we can delete any created files on
// exception; this saves all codecs from having to do
// it:
TrackingDirectoryWrapper trackingDir = new TrackingDirectoryWrapper(dir);
// We can write directly to the actual name (vs to a
// .tmp & renaming it) because the file is not live
// until segments file is written:
boolean success = false;
try {
Codec codec = info.info.getCodec();
codec.liveDocsFormat().writeLiveDocs((MutableBits)liveDocs, trackingDir, info, pendingDeleteCount, IOContext.DEFAULT);
success = true;
} finally {
if (!success) {
// Advance only the nextWriteDelGen so that a 2nd
// attempt to write will write to a new file
info.advanceNextWriteDelGen();
// Delete any partially created file(s):
for (String fileName : trackingDir.getCreatedFiles()) {
IOUtils.deleteFilesIgnoringExceptions(dir, fileName);
}
}
}
// If we hit an exc in the line above (eg disk full)
// then info's delGen remains pointing to the previous
// (successfully written) del docs:
info.advanceDelGen();
info.setDelCount(info.getDelCount() + pendingDeleteCount);
reset();
return true;
}
/**
* Returns <code>true</code> iff the segment represented by this {@link PendingDeletes} is fully deleted
*/
boolean isFullyDeleted() {
return info.getDelCount() + pendingDeleteCount == info.info.maxDoc();
}
}

View File

@ -20,6 +20,7 @@ package org.apache.lucene.index;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap; import java.util.HashMap;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
@ -34,7 +35,6 @@ import org.apache.lucene.codecs.Codec;
import org.apache.lucene.codecs.DocValuesConsumer; import org.apache.lucene.codecs.DocValuesConsumer;
import org.apache.lucene.codecs.DocValuesFormat; import org.apache.lucene.codecs.DocValuesFormat;
import org.apache.lucene.codecs.FieldInfosFormat; import org.apache.lucene.codecs.FieldInfosFormat;
import org.apache.lucene.codecs.LiveDocsFormat;
import org.apache.lucene.store.Directory; import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FlushInfo; import org.apache.lucene.store.FlushInfo;
import org.apache.lucene.store.IOContext; import org.apache.lucene.store.IOContext;
@ -43,38 +43,27 @@ import org.apache.lucene.util.Bits;
import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.IOUtils; import org.apache.lucene.util.IOUtils;
import org.apache.lucene.util.InfoStream; import org.apache.lucene.util.InfoStream;
import org.apache.lucene.util.MutableBits;
// Used by IndexWriter to hold open SegmentReaders (for // Used by IndexWriter to hold open SegmentReaders (for
// searching or merging), plus pending deletes and updates, // searching or merging), plus pending deletes and updates,
// for a given segment // for a given segment
class ReadersAndUpdates { final class ReadersAndUpdates {
// Not final because we replace (clone) when we need to // Not final because we replace (clone) when we need to
// change it and it's been shared: // change it and it's been shared:
public final SegmentCommitInfo info; final SegmentCommitInfo info;
// Tracks how many consumers are using this instance: // Tracks how many consumers are using this instance:
private final AtomicInteger refCount = new AtomicInteger(1); private final AtomicInteger refCount = new AtomicInteger(1);
private final IndexWriter writer;
// Set once (null, and then maybe set, and never set again): // Set once (null, and then maybe set, and never set again):
private SegmentReader reader; private SegmentReader reader;
// Holds the current shared (readable and writable)
// liveDocs. This is null when there are no deleted
// docs, and it's copy-on-write (cloned whenever we need
// to change it but it's been shared to an external NRT
// reader).
private Bits liveDocs;
// How many further deletions we've done against // How many further deletions we've done against
// liveDocs vs when we loaded it or last wrote it: // liveDocs vs when we loaded it or last wrote it:
private int pendingDeleteCount; private final PendingDeletes pendingDeletes;
// True if the current liveDocs is referenced by an // the major version this index was created with
// external NRT reader: private final int indexCreatedVersionMajor;
private boolean liveDocsShared;
// Indicates whether this segment is currently being merged. While a segment // Indicates whether this segment is currently being merged. While a segment
// is merging, all field updates are also registered in the // is merging, all field updates are also registered in the
@ -96,25 +85,23 @@ class ReadersAndUpdates {
// Only set if there are doc values updates against this segment, and the index is sorted: // Only set if there are doc values updates against this segment, and the index is sorted:
Sorter.DocMap sortMap; Sorter.DocMap sortMap;
public final AtomicLong ramBytesUsed = new AtomicLong(); final AtomicLong ramBytesUsed = new AtomicLong();
public ReadersAndUpdates(IndexWriter writer, SegmentCommitInfo info) { ReadersAndUpdates(int indexCreatedVersionMajor, SegmentCommitInfo info, SegmentReader reader,
this.writer = writer; PendingDeletes pendingDeletes) {
this.info = info; this.info = info;
liveDocsShared = true; this.pendingDeletes = pendingDeletes;
this.indexCreatedVersionMajor = indexCreatedVersionMajor;
this.reader = reader;
} }
/** Init from a previously opened SegmentReader. /** Init from a previously opened SegmentReader.
* *
* <p>NOTE: steals incoming ref from reader. */ * <p>NOTE: steals incoming ref from reader. */
public ReadersAndUpdates(IndexWriter writer, SegmentReader reader) { ReadersAndUpdates(int indexCreatedVersionMajor, SegmentReader reader, PendingDeletes pendingDeletes) {
this.writer = writer; this(indexCreatedVersionMajor, reader.getSegmentInfo(), reader, pendingDeletes);
this.reader = reader; assert pendingDeletes.numPendingDeletes() >= 0
info = reader.getSegmentInfo(); : "got " + pendingDeletes.numPendingDeletes() + " reader.numDeletedDocs()=" + reader.numDeletedDocs() + " info.getDelCount()=" + info.getDelCount() + " maxDoc=" + reader.maxDoc() + " numDocs=" + reader.numDocs();
liveDocs = reader.getLiveDocs();
liveDocsShared = true;
pendingDeleteCount = reader.numDeletedDocs() - info.getDelCount();
assert pendingDeleteCount >= 0: "got " + pendingDeleteCount + " reader.numDeletedDocs()=" + reader.numDeletedDocs() + " info.getDelCount()=" + info.getDelCount() + " maxDoc=" + reader.maxDoc() + " numDocs=" + reader.numDocs();
} }
public void incRef() { public void incRef() {
@ -134,7 +121,7 @@ class ReadersAndUpdates {
} }
public synchronized int getPendingDeleteCount() { public synchronized int getPendingDeleteCount() {
return pendingDeleteCount; return pendingDeletes.numPendingDeletes();
} }
private synchronized boolean assertNoDupGen(List<DocValuesFieldUpdates> fieldUpdates, DocValuesFieldUpdates update) { private synchronized boolean assertNoDupGen(List<DocValuesFieldUpdates> fieldUpdates, DocValuesFieldUpdates update) {
@ -186,6 +173,7 @@ class ReadersAndUpdates {
// Call only from assert! // Call only from assert!
public synchronized boolean verifyDocCounts() { public synchronized boolean verifyDocCounts() {
int count; int count;
Bits liveDocs = pendingDeletes.getLiveDocs();
if (liveDocs != null) { if (liveDocs != null) {
count = 0; count = 0;
for(int docID=0;docID<info.info.maxDoc();docID++) { for(int docID=0;docID<info.info.maxDoc();docID++) {
@ -197,7 +185,7 @@ class ReadersAndUpdates {
count = info.info.maxDoc(); count = info.info.maxDoc();
} }
assert info.info.maxDoc() - info.getDelCount() - pendingDeleteCount == count: "info.maxDoc=" + info.info.maxDoc() + " info.getDelCount()=" + info.getDelCount() + " pendingDeleteCount=" + pendingDeleteCount + " count=" + count; assert info.info.maxDoc() - info.getDelCount() - pendingDeletes.numPendingDeletes() == count: "info.maxDoc=" + info.info.maxDoc() + " info.getDelCount()=" + info.getDelCount() + " pendingDeletes=" + pendingDeletes.numPendingDeletes() + " count=" + count;
return true; return true;
} }
@ -205,12 +193,9 @@ class ReadersAndUpdates {
public synchronized SegmentReader getReader(IOContext context) throws IOException { public synchronized SegmentReader getReader(IOContext context) throws IOException {
if (reader == null) { if (reader == null) {
// We steal returned ref: // We steal returned ref:
reader = new SegmentReader(info, writer.segmentInfos.getIndexCreatedVersionMajor(), context); reader = new SegmentReader(info, indexCreatedVersionMajor, context);
if (liveDocs == null) { pendingDeletes.onNewReader(reader, info);
liveDocs = reader.getLiveDocs();
} }
}
// Ref for caller // Ref for caller
reader.incRef(); reader.incRef();
return reader; return reader;
@ -222,16 +207,7 @@ class ReadersAndUpdates {
} }
public synchronized boolean delete(int docID) throws IOException { public synchronized boolean delete(int docID) throws IOException {
initWritableLiveDocs(); return pendingDeletes.delete(docID);
assert liveDocs != null;
assert docID >= 0 && docID < liveDocs.length() : "out of bounds: docid=" + docID + " liveDocsLength=" + liveDocs.length() + " seg=" + info.info.name + " maxDoc=" + info.info.maxDoc();
assert !liveDocsShared;
final boolean didDelete = liveDocs.get(docID);
if (didDelete) {
((MutableBits) liveDocs).clear(docID);
pendingDeleteCount++;
}
return didDelete;
} }
// NOTE: removes callers ref // NOTE: removes callers ref
@ -258,10 +234,11 @@ class ReadersAndUpdates {
getReader(context).decRef(); getReader(context).decRef();
assert reader != null; assert reader != null;
} }
// force new liveDocs in initWritableLiveDocs even if it's null // force new liveDocs
liveDocsShared = true; Bits liveDocs = pendingDeletes.getLiveDocs();
pendingDeletes.liveDocsShared();
if (liveDocs != null) { if (liveDocs != null) {
return new SegmentReader(reader.getSegmentInfo(), reader, liveDocs, info.info.maxDoc() - info.getDelCount() - pendingDeleteCount); return new SegmentReader(reader.getSegmentInfo(), reader, liveDocs, info.info.maxDoc() - info.getDelCount() - pendingDeletes.numPendingDeletes());
} else { } else {
// liveDocs == null and reader != null. That can only be if there are no deletes // liveDocs == null and reader != null. That can only be if there are no deletes
assert reader.getLiveDocs() == null; assert reader.getLiveDocs() == null;
@ -270,29 +247,12 @@ class ReadersAndUpdates {
} }
} }
private synchronized void initWritableLiveDocs() throws IOException {
assert info.info.maxDoc() > 0;
if (liveDocsShared) {
// Copy on write: this means we've cloned a
// SegmentReader sharing the current liveDocs
// instance; must now make a private clone so we can
// change it:
LiveDocsFormat liveDocsFormat = info.info.getCodec().liveDocsFormat();
if (liveDocs == null) {
liveDocs = liveDocsFormat.newLiveDocs(info.info.maxDoc());
} else {
liveDocs = liveDocsFormat.newLiveDocs(liveDocs);
}
liveDocsShared = false;
}
}
public synchronized Bits getLiveDocs() { public synchronized Bits getLiveDocs() {
return liveDocs; return pendingDeletes.getLiveDocs();
} }
public synchronized void dropChanges() { public synchronized void dropChanges() {
assert Thread.holdsLock(writer);
// Discard (don't save) changes when we are dropping // Discard (don't save) changes when we are dropping
// the reader; this is used only on the sub-readers // the reader; this is used only on the sub-readers
// after a successful merge. If deletes had // after a successful merge. If deletes had
@ -300,7 +260,7 @@ class ReadersAndUpdates {
// is running, by now we have carried forward those // is running, by now we have carried forward those
// deletes onto the newly merged segment, so we can // deletes onto the newly merged segment, so we can
// discard them on the sub-readers: // discard them on the sub-readers:
pendingDeleteCount = 0; pendingDeletes.reset();
dropMergingUpdates(); dropMergingUpdates();
} }
@ -308,47 +268,7 @@ class ReadersAndUpdates {
// _X_N updates files) to the directory; returns true if it wrote any file // _X_N updates files) to the directory; returns true if it wrote any file
// and false if there were no new deletes or updates to write: // and false if there were no new deletes or updates to write:
public synchronized boolean writeLiveDocs(Directory dir) throws IOException { public synchronized boolean writeLiveDocs(Directory dir) throws IOException {
if (pendingDeleteCount == 0) { return pendingDeletes.writeLiveDocs(dir);
return false;
}
// We have new deletes
assert liveDocs.length() == info.info.maxDoc();
// Do this so we can delete any created files on
// exception; this saves all codecs from having to do
// it:
TrackingDirectoryWrapper trackingDir = new TrackingDirectoryWrapper(dir);
// We can write directly to the actual name (vs to a
// .tmp & renaming it) because the file is not live
// until segments file is written:
boolean success = false;
try {
Codec codec = info.info.getCodec();
codec.liveDocsFormat().writeLiveDocs((MutableBits)liveDocs, trackingDir, info, pendingDeleteCount, IOContext.DEFAULT);
success = true;
} finally {
if (!success) {
// Advance only the nextWriteDelGen so that a 2nd
// attempt to write will write to a new file
info.advanceNextWriteDelGen();
// Delete any partially created file(s):
for (String fileName : trackingDir.getCreatedFiles()) {
IOUtils.deleteFilesIgnoringExceptions(dir, fileName);
}
}
}
// If we hit an exc in the line above (eg disk full)
// then info's delGen remains pointing to the previous
// (successfully written) del docs:
info.advanceDelGen();
info.setDelCount(info.getDelCount() + pendingDeleteCount);
pendingDeleteCount = 0;
return true;
} }
@SuppressWarnings("synthetic-access") @SuppressWarnings("synthetic-access")
@ -404,7 +324,6 @@ class ReadersAndUpdates {
if (fieldInfoIn != fieldInfo) { if (fieldInfoIn != fieldInfo) {
throw new IllegalArgumentException("wrong fieldInfo"); throw new IllegalArgumentException("wrong fieldInfo");
} }
final int maxDoc = reader.maxDoc();
DocValuesFieldUpdates.Iterator[] subs = new DocValuesFieldUpdates.Iterator[updatesToApply.size()]; DocValuesFieldUpdates.Iterator[] subs = new DocValuesFieldUpdates.Iterator[updatesToApply.size()];
for(int i=0;i<subs.length;i++) { for(int i=0;i<subs.length;i++) {
subs[i] = updatesToApply.get(i).iterator(); subs[i] = updatesToApply.get(i).iterator();
@ -623,7 +542,7 @@ class ReadersAndUpdates {
} }
} }
private synchronized Set<String> writeFieldInfosGen(FieldInfos fieldInfos, Directory dir, DocValuesFormat dvFormat, private synchronized Set<String> writeFieldInfosGen(FieldInfos fieldInfos, Directory dir,
FieldInfosFormat infosFormat) throws IOException { FieldInfosFormat infosFormat) throws IOException {
final long nextFieldInfosGen = info.getNextFieldInfosGen(); final long nextFieldInfosGen = info.getNextFieldInfosGen();
final String segmentSuffix = Long.toString(nextFieldInfosGen, Character.MAX_RADIX); final String segmentSuffix = Long.toString(nextFieldInfosGen, Character.MAX_RADIX);
@ -639,22 +558,15 @@ class ReadersAndUpdates {
return trackingDir.getCreatedFiles(); return trackingDir.getCreatedFiles();
} }
public synchronized boolean writeFieldUpdates(Directory dir, long maxDelGen, InfoStream infoStream) throws IOException { public synchronized boolean writeFieldUpdates(Directory dir, FieldInfos.FieldNumbers fieldNumbers, long maxDelGen, InfoStream infoStream) throws IOException {
long startTimeNS = System.nanoTime(); long startTimeNS = System.nanoTime();
assert Thread.holdsLock(writer);
final Map<Integer,Set<String>> newDVFiles = new HashMap<>(); final Map<Integer,Set<String>> newDVFiles = new HashMap<>();
Set<String> fieldInfosFiles = null; Set<String> fieldInfosFiles = null;
FieldInfos fieldInfos = null; FieldInfos fieldInfos = null;
boolean any = false; boolean any = false;
int count = 0;
for (List<DocValuesFieldUpdates> updates : pendingDVUpdates.values()) { for (List<DocValuesFieldUpdates> updates : pendingDVUpdates.values()) {
// Sort by increasing delGen: // Sort by increasing delGen:
Collections.sort(updates, (a, b) -> Long.compare(a.delGen, b.delGen)); Collections.sort(updates, Comparator.comparingLong(a -> a.delGen));
count += updates.size();
for (DocValuesFieldUpdates update : updates) { for (DocValuesFieldUpdates update : updates) {
if (update.delGen <= maxDelGen && update.any()) { if (update.delGen <= maxDelGen && update.any()) {
any = true; any = true;
@ -680,7 +592,7 @@ class ReadersAndUpdates {
// IndexWriter.commitMergedDeletes). // IndexWriter.commitMergedDeletes).
final SegmentReader reader; final SegmentReader reader;
if (this.reader == null) { if (this.reader == null) {
reader = new SegmentReader(info, writer.segmentInfos.getIndexCreatedVersionMajor(), IOContext.READONCE); reader = new SegmentReader(info, indexCreatedVersionMajor, IOContext.READONCE);
} else { } else {
reader = this.reader; reader = this.reader;
} }
@ -688,7 +600,7 @@ class ReadersAndUpdates {
try { try {
// clone FieldInfos so that we can update their dvGen separately from // clone FieldInfos so that we can update their dvGen separately from
// the reader's infos and write them to a new fieldInfos_gen file // the reader's infos and write them to a new fieldInfos_gen file
FieldInfos.Builder builder = new FieldInfos.Builder(writer.globalFieldNumberMap); FieldInfos.Builder builder = new FieldInfos.Builder(fieldNumbers);
// cannot use builder.add(reader.getFieldInfos()) because it does not // cannot use builder.add(reader.getFieldInfos()) because it does not
// clone FI.attributes as well FI.dvGen // clone FI.attributes as well FI.dvGen
for (FieldInfo fi : reader.getFieldInfos()) { for (FieldInfo fi : reader.getFieldInfos()) {
@ -713,7 +625,7 @@ class ReadersAndUpdates {
handleNumericDVUpdates(fieldInfos, trackingDir, docValuesFormat, reader, newDVFiles, maxDelGen, infoStream); handleNumericDVUpdates(fieldInfos, trackingDir, docValuesFormat, reader, newDVFiles, maxDelGen, infoStream);
handleBinaryDVUpdates(fieldInfos, trackingDir, docValuesFormat, reader, newDVFiles, maxDelGen, infoStream); handleBinaryDVUpdates(fieldInfos, trackingDir, docValuesFormat, reader, newDVFiles, maxDelGen, infoStream);
fieldInfosFiles = writeFieldInfosGen(fieldInfos, trackingDir, docValuesFormat, codec.fieldInfosFormat()); fieldInfosFiles = writeFieldInfosGen(fieldInfos, trackingDir, codec.fieldInfosFormat());
} finally { } finally {
if (reader != this.reader) { if (reader != this.reader) {
reader.close(); reader.close();
@ -763,11 +675,12 @@ class ReadersAndUpdates {
// if there is a reader open, reopen it to reflect the updates // if there is a reader open, reopen it to reflect the updates
if (reader != null) { if (reader != null) {
SegmentReader newReader = new SegmentReader(info, reader, liveDocs, info.info.maxDoc() - info.getDelCount() - pendingDeleteCount); SegmentReader newReader = new SegmentReader(info, reader, pendingDeletes.getLiveDocs(), info.info.maxDoc() - info.getDelCount() - pendingDeletes.numPendingDeletes());
boolean success2 = false; boolean success2 = false;
try { try {
reader.decRef(); reader.decRef();
reader = newReader; reader = newReader;
pendingDeletes.onNewReader(reader, info);
success2 = true; success2 = true;
} finally { } finally {
if (success2 == false) { if (success2 == false) {
@ -792,14 +705,10 @@ class ReadersAndUpdates {
} }
info.setDocValuesUpdatesFiles(newDVFiles); info.setDocValuesUpdatesFiles(newDVFiles);
// wrote new files, should checkpoint()
writer.checkpointNoSIS();
if (infoStream.isEnabled("BD")) { if (infoStream.isEnabled("BD")) {
infoStream.message("BD", String.format(Locale.ROOT, "done write field updates for seg=%s; took %.3fs; new files: %s", infoStream.message("BD", String.format(Locale.ROOT, "done write field updates for seg=%s; took %.3fs; new files: %s",
info, (System.nanoTime() - startTimeNS)/1000000000.0, newDVFiles)); info, (System.nanoTime() - startTimeNS)/1000000000.0, newDVFiles));
} }
return true; return true;
} }
@ -829,12 +738,11 @@ class ReadersAndUpdates {
} }
SegmentReader reader = getReader(context); SegmentReader reader = getReader(context);
int delCount = pendingDeleteCount + info.getDelCount(); int delCount = pendingDeletes.numPendingDeletes() + info.getDelCount();
if (delCount != reader.numDeletedDocs()) { if (delCount != reader.numDeletedDocs()) {
// beware of zombies: // beware of zombies:
assert delCount > reader.numDeletedDocs(): "delCount=" + delCount + " reader.numDeletedDocs()=" + reader.numDeletedDocs(); assert delCount > reader.numDeletedDocs(): "delCount=" + delCount + " reader.numDeletedDocs()=" + reader.numDeletedDocs();
Bits liveDocs = pendingDeletes.getLiveDocs();
assert liveDocs != null; assert liveDocs != null;
// Create a new reader with the latest live docs: // Create a new reader with the latest live docs:
@ -842,6 +750,7 @@ class ReadersAndUpdates {
boolean success = false; boolean success = false;
try { try {
reader.decRef(); reader.decRef();
pendingDeletes.onNewReader(newReader, info);
success = true; success = true;
} finally { } finally {
if (success == false) { if (success == false) {
@ -851,7 +760,7 @@ class ReadersAndUpdates {
reader = newReader; reader = newReader;
} }
liveDocsShared = true; pendingDeletes.liveDocsShared();
assert verifyDocCounts(); assert verifyDocCounts();
@ -877,8 +786,12 @@ class ReadersAndUpdates {
public String toString() { public String toString() {
StringBuilder sb = new StringBuilder(); StringBuilder sb = new StringBuilder();
sb.append("ReadersAndLiveDocs(seg=").append(info); sb.append("ReadersAndLiveDocs(seg=").append(info);
sb.append(" pendingDeleteCount=").append(pendingDeleteCount); sb.append(" pendingDeletes=").append(pendingDeletes);
sb.append(" liveDocsShared=").append(liveDocsShared);
return sb.toString(); return sb.toString();
} }
public synchronized boolean isFullyDeleted() {
return pendingDeletes.isFullyDeleted();
}
} }

View File

@ -0,0 +1,142 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.lucene.index;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import org.apache.lucene.codecs.Codec;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.RAMDirectory;
import org.apache.lucene.util.Bits;
import org.apache.lucene.util.LuceneTestCase;
import org.apache.lucene.util.StringHelper;
import org.apache.lucene.util.TestUtil;
import org.apache.lucene.util.Version;
public class TestPendingDeletes extends LuceneTestCase {
public void testDeleteDoc() throws IOException {
RAMDirectory dir = new RAMDirectory();
SegmentInfo si = new SegmentInfo(dir, Version.LATEST, Version.LATEST, "test", 10, false, Codec.getDefault(),
Collections.emptyMap(), StringHelper.randomId(), new HashMap<>(), null);
SegmentCommitInfo commitInfo = new SegmentCommitInfo(si, 0, 0, 0, 0);
PendingDeletes deletes = new PendingDeletes(null, commitInfo);
assertNull(deletes.getLiveDocs());
int docToDelete = TestUtil.nextInt(random(), 0, 7);
assertTrue(deletes.delete(docToDelete));
assertNotNull(deletes.getLiveDocs());
assertEquals(1, deletes.numPendingDeletes());
Bits liveDocs = deletes.getLiveDocs();
assertFalse(liveDocs.get(docToDelete));
assertFalse(deletes.delete(docToDelete)); // delete again
// make sure we are live ie. mutable
assertTrue(liveDocs.get(8));
assertTrue(deletes.delete(8));
assertFalse(liveDocs.get(8));
assertEquals(2, deletes.numPendingDeletes());
deletes.liveDocsShared();
// make sure we are live ie. mutable
assertTrue(liveDocs.get(9));
assertTrue(deletes.delete(9));
assertTrue(liveDocs.get(9));
liveDocs = deletes.getLiveDocs();
assertFalse(liveDocs.get(9));
assertFalse(liveDocs.get(8));
assertFalse(liveDocs.get(docToDelete));
assertEquals(3, deletes.numPendingDeletes());
dir.close();
}
public void testWriteLiveDocs() throws IOException {
RAMDirectory dir = new RAMDirectory();
SegmentInfo si = new SegmentInfo(dir, Version.LATEST, Version.LATEST, "test", 6, false, Codec.getDefault(),
Collections.emptyMap(), StringHelper.randomId(), new HashMap<>(), null);
SegmentCommitInfo commitInfo = new SegmentCommitInfo(si, 0, 0, 0, 0);
PendingDeletes deletes = new PendingDeletes(null, commitInfo);
assertFalse(deletes.writeLiveDocs(dir));
assertEquals(0, dir.listAll().length);
boolean secondDocDeletes = random().nextBoolean();
deletes.delete(5);
if (secondDocDeletes) {
deletes.liveDocsShared();
deletes.delete(2);
}
assertEquals(0, commitInfo.getDelGen());
assertEquals(0, commitInfo.getDelCount());
assertEquals(secondDocDeletes ? 2 : 1, deletes.numPendingDeletes());
assertTrue(deletes.writeLiveDocs(dir));
assertEquals(1, dir.listAll().length);
Bits liveDocs = Codec.getDefault().liveDocsFormat().readLiveDocs(dir, commitInfo, IOContext.DEFAULT);
assertFalse(liveDocs.get(5));
if (secondDocDeletes) {
assertFalse(liveDocs.get(2));
} else {
assertTrue(liveDocs.get(2));
}
assertTrue(liveDocs.get(0));
assertTrue(liveDocs.get(1));
assertTrue(liveDocs.get(3));
assertTrue(liveDocs.get(4));
assertEquals(0, deletes.numPendingDeletes());
assertEquals(secondDocDeletes ? 2 : 1, commitInfo.getDelCount());
assertEquals(1, commitInfo.getDelGen());
deletes.delete(0);
assertTrue(deletes.writeLiveDocs(dir));
assertEquals(2, dir.listAll().length);
liveDocs = Codec.getDefault().liveDocsFormat().readLiveDocs(dir, commitInfo, IOContext.DEFAULT);
assertFalse(liveDocs.get(5));
if (secondDocDeletes) {
assertFalse(liveDocs.get(2));
} else {
assertTrue(liveDocs.get(2));
}
assertFalse(liveDocs.get(0));
assertTrue(liveDocs.get(1));
assertTrue(liveDocs.get(3));
assertTrue(liveDocs.get(4));
assertEquals(0, deletes.numPendingDeletes());
assertEquals(secondDocDeletes ? 3 : 2, commitInfo.getDelCount());
assertEquals(2, commitInfo.getDelGen());
dir.close();
}
public void testIsFullyDeleted() throws IOException {
RAMDirectory dir = new RAMDirectory();
SegmentInfo si = new SegmentInfo(dir, Version.LATEST, Version.LATEST, "test", 3, false, Codec.getDefault(),
Collections.emptyMap(), StringHelper.randomId(), new HashMap<>(), null);
SegmentCommitInfo commitInfo = new SegmentCommitInfo(si, 0, 0, 0, 0);
PendingDeletes deletes = new PendingDeletes(null, commitInfo);
for (int i = 0; i < 3; i++) {
assertTrue(deletes.delete(i));
if (random().nextBoolean()) {
assertTrue(deletes.writeLiveDocs(dir));
}
assertEquals(i == 2, deletes.isFullyDeleted());
}
}
}